grasys blog

GenAI Processorsを触ってみる 2/4 ~ Processorについて ~

はじめまして!エンジニアのUemaです。

今回は、AIパイプラインをシンプルに書けるように設計された、Google DeepMind の新しいオープンソース Python ライブラリである「GenAI Processors」をGet Startedやexampleを参考にしながら触ってみたいと思います。

シンプルに書けるとはいえ、ライブラリの構成要素やコンセプトなどを理解しないと混乱するため、それらもまとめてみました。

4部構成となっていて、今回はPart2となっています。

Part2は、受け取った ProcessorPart の処理を担う Processor について、まとめてみました。

シリーズの他の記事はこちらから見られます。
『GenAI Processorsを触ってみる』シリーズ一覧

GenAI Processorsとは

Part 1でも書きましたが、

AIパイプラインを構築でき、効率的な並列コンテンツ処理を可能にする軽量の Python ライブラリです。

テキスト、音声、画像といった複数のデータ形式(マルチモーダル)を組み合わせ、リアルタイムで処理するような複雑なアプリケーションの開発を効率化することに特化しています。

Processor

Processor Intro Colab より

コアコンセプト

GenAI Processorsライブラリは、ProcessorPartProcessorPartProcessor を中心に構成されています。ProcessorPartProcessorについては、下記の通りです。

  • Processor: ProcessorPart オブジェクトの非同期ストリーム(AsyncIterable)を入力として受け取り、ProcessorPart オブジェクトの非同期ストリームを出力として返します。Processorを連結して、複雑なパイプラインを形成することができます。
  • PartProcessor: ストリーム内のパートを個別に処理できる場合に特化したProcessorです。ProcessorPart を受け取り、ProcessorPart の非同期ストリームを返します。入力ストリーム内の各 ProcessorPart に対して PartProcessor を並行して呼び出し、出力を正しい順序で組み立てる処理を行うことによって、遅延を少なくします。

ProcessorPartPartProcessorは名前が似ていますが、異なる概念を指しているため混同しやすいです。

  • ProcessorPartは、単一のコンテンツを表すデータオブジェクトです。
  • PartProcessorは、個々のProcessorPartを操作するように設計されたProcessorです。

Processor の作成

Processor の作成として下記2つの方法があります。

  • 非同期ジェネレータ関数に@processor.processor_functionデコレータを使用する
  • processor.Processorクラスを継承してcall()メソッドを実装する

上記2つの方法はどちらもジェネレータとして、yieldProcessorPartを返す必要があります。

from collections.abc import AsyncIterable

from genai_processors import content_api, processor
from genai_processors.core import preamble


# デコレータを使ってProcessor作成
@processor.processor_function
async def simple_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """ドットを'[EoS]'に置き換えます。"""
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield content_api.ProcessorPart(part.text.replace(".", "[EoS]"))
        else:
            yield part


# クラス継承を使ってProcessor作成
class SimpleTextProcessor(processor.Processor):
    """SimpleTextProcessorクラス"""

    def __init__(self, eos_string: str) -> None:
        self._eos = eos_string
        # Preambleはコンテンツストリームにプレフィックスを追加します。
        self._preamble = preamble.Preamble(content="開始します.")

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPart]:
        """ドットを'[EoS]'に置き換えます。"""
        async for part in self._preamble(content):
            if content_api.is_text(part.mimetype):
                yield content_api.ProcessorPart(part.text.replace(".", self._eos))
            else:
                yield part

Processorの適用

作成したProcessorを使ってみたいと思います。

非同期実行の場合、async for を使用してProcessorをイテレートすることで、入力ストリームにProcessorを適用できます。

GenAI Processorsでは非同期実行が推奨されています。同期実行を知りたい場合は、こちらのSynchronous Applicationを参照してください。

import asyncio

from genai_processors import streams


async def main():
    print("\n非同期出力(デコレータ):")
    input_parts = ["Hello,", "Processor", "Decorator."]
    input_stream = streams.stream_content(input_parts)
    async for part in simple_text_processor(input_stream):
        print(part.text)

    print("\n非同期出力(クラス):")
    input_parts = ["Hello,", "Processor", "Class."]
    input_stream = streams.stream_content(input_parts)
    stp = SimpleTextProcessor("[EOS]")
    async for part in stp(input_stream):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

Processorの連結

Processorの連結は、このライブラリの目玉の機能です。

+ 演算子を使用することでProcessorを連結することができます。

ProcessorPartが、statusまたはdebugサブストリームである場合、連結の動作が異なります。生成されるとすぐに呼び出し元に返され、チェーン内の次のProcessorには渡されません。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


@processor.processor_function
async def simple_text_processor_with_status(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """ドットを'[EoS]'に置き換えます。"""
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield content_api.ProcessorPart(part.text.replace(".", "[EoS]"))
            yield processor.status(
                f"Processorが完了しました: 元のテキスト: {part.text}",
            )  # 'status'サブストリームは、次のProcessorに渡されない
            # # 下記でも可
            # yield content_api.ProcessorPart(
            #     f"Processorが完了しました: part.text: {part.text}",
            #     substream_name="status",
            # )
        else:
            yield part


@processor.processor_function
async def another_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """すべてを小文字に変換します。"""
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield content_api.ProcessorPart(part.text.lower())
        else:
            yield part


async def main():
    print("\n連結されたProcessorの出力:")

    input_parts = ["First.", "Second."]
    chained_processor = simple_text_processor_with_status + another_text_processor
    input_streams = streams.stream_content(input_parts)
    async for part in chained_processor(input_streams):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

Processorの並列実行

Processorは、parallel_concat() 関数を使用することで並列に実行できます。

下記の処理結果だと、another_text_processor の後に simple_text_processor_with_status が出力されます。parallel_concat に渡されたProcessorのリストの順序に従います。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


@processor.processor_function
async def simple_text_processor_with_status(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """ドットを'[EoS]'に置き換えます。"""
    # 「Processorの連結」のサンプルコードを参照


@processor.processor_function
async def another_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """すべてを小文字に変換します。"""
    # 「Processorの連結」のサンプルコードを参照


async def main():
    print("\n並列Processorの出力:")
    input_stream = streams.stream_content(["First.", "Second."])
    p = [another_text_processor, simple_text_processor_with_status]
    p = processor.parallel_concat(p)  # Processorの並列実行
    async for part in p(input_stream):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

Processorのスイッチ

Processorを切り替えたい場合には、switch モジュールを使うことによって実現できます。

最初に一致したcase条件が使用されます。どの条件にも一致しない場合は、defaultが実行されます。defaultがない場合は、ProcessorPartは返されないです。

import asyncio
import re
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams, switch
from genai_processors.content_api import ProcessorPart


@processor.processor_function
async def simple_text_processor_with_status(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """ドットを'[EoS]'に置き換えます。"""
    # 「Processorの連結」のサンプルコードを参照


@processor.processor_function
async def another_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """すべてを小文字に変換します。"""
    # 「Processorの連結」のサンプルコードを参照


async def main():
    print("\nSwitch Processorの出力:")

    input_stream = streams.stream_content(
        [
            content_api.ProcessorPart("A1.", substream_name="a"),
            content_api.ProcessorPart("B1.", substream_name="b"),
            content_api.ProcessorPart("A2.", substream_name="a"),
            content_api.ProcessorPart("B2.", substream_name="b"),
            content_api.ProcessorPart("C1.", substream_name="c"),
            content_api.ProcessorPart("B3.", substream_name="b"),
        ],
    )
    m = (
        switch.Switch(content_api.get_substream_name)
        .case("a", another_text_processor)
        .case("b", simple_text_processor_with_status)
        .default(processor.passthrough())
    )
    async for part in m(input_stream):
        print(part.text)

    print("\nSwitch Processorの出力(カスタム条件):")

    def is_upper(p: ProcessorPart) -> bool:
        """大文字が含まれているかを判定"""
        if content_api.is_text(p.mimetype):
            return bool(re.search(r"[A-Z]", p.text))
        return False

    input_stream = streams.stream_content(
        [
            content_api.ProcessorPart("A1."),
            content_api.ProcessorPart("B1."),
            content_api.ProcessorPart("a2."),
            content_api.ProcessorPart("B2."),
        ],
    )
    m = (
        switch.Switch()
        .case(is_upper, another_text_processor)
        .default(processor.passthrough())
    )
    async for part in m(input_stream):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

GenAIモデルをProcessorとして使用する

このライブラリには、Googleの生成AIモデルと連携するためのProcessorが用意されています。

Google AI StudioからAPI Keyを作成して、GenaiModel でProcessorを生成します。

debug.TTFTSingleStream プロセッサを使うことで、元のロジックを維持したまま、呼び出しから最初の出力までの時間を記録することができます。

from genai_processors import debug
from genai_processors.core import genai_model
from google.genai import types

# GenAIモデル Processorを初期化します
# 'gemini-2.0-flash'を目的のモデル名に置き換えてください
genai_processor = genai_model.GenaiModel(
    api_key=API_KEY,  # Google AI StudioからAPI Keyを作成する
    model_name="gemini-2.0-flash",
    generate_content_config=types.GenerateContentConfig(temperature=0.7),
)

# GenAI Processorを、すべての入力を小文字に変換するProcessorと連結します。
genai_pipeline = (
    another_text_processor
    # TTFTが適用されるProcessorに「GenAIモデル」というタグを追加します
    + debug.TTFTSingleStream("GenAI Model", genai_processor)
)

この記事を書いている時点でのGenAI Processorsの GenaiModel では、Vertex AI Gemini API は対応しておりませんでした。そのため、元の GenaiModel を参考に新しくProcessorを作成してみました。新しく作成したGenAIモデルは、Part4の「Exampleを触ってみる」に記載してます。

PartProcessor の操作

PartProcessorの作成としてProcessorと同じ様な方法で下記2つの方法があります。

  • 非同期ジェネレータ関数に @processor.part_processor_function デコレータを使用する
  • processor.PartProcessor クラスを継承してcall()メソッドを実装する

match 関数を使うことで、フィルターを作成することができます。これによって、mimetypeに対して処理を追加するなどができます。

フィルターは、PartProcessor を継承したクラスで match メソッドをオーバーライドするか、下記のサンプルコードのように@processor.part_processor_function デコレータの引数に渡すことで実装できます。

match 関数は、ProcessorPart を引数で受け取り、bool を返す様に実装します。

PartProcessor Processor インターフェースを実装しておらず、Processor と混同されることがあります。ProcessorPartを期待し、AsyncIterableではないため、エラーとなります。

to_processor()メソッドを使用して PartProcessorProcessor にキャストすることで正常に動作します。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


def match_text(part: content_api.ProcessorPart) -> bool:
    return content_api.is_text(part.mimetype)  # text/plainは実行
    # return content_api.is_audio(part.mimetype)


@processor.part_processor_function(match_fn=match_text)
async def duplicate_part(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
    """入力パートを複製します。"""
    yield part
    yield part
    yield processor.status(f"テキストを複製しました: {part.text}")


async def main():
    input_parts_duplicate = streams.stream_content(["A", "B"])

    # `duplicate_part`を入力*ストリーム*に適用するには、Processorが必要です。
    p = duplicate_part.to_processor()

    print("\nPart Processorの出力:")
    async for part in p(input_parts_duplicate):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

PartProcessor の並列実行

// 演算子を使用して、複数の PartProcessor オブジェクトを並列に実行できます。// 演算子は、PartProcessor にのみ適用されます。

並列で動いている複数のPartProcessor には、ProcessorPart オブジェクトがコピーされずに参照渡しで同じオブジェクトが渡されるため、ProcessorPart可変属性を更新するとバグが生じる可能性があります。

どの PartProcessor から何も返されない場合には、全体からも何も返らないです。そのため、そのまま返される様な processor.PASSTHROUGH_ALWAYS と言ったものもあります。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


@processor.part_processor_function
async def append_star(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
    """テキストにアスタリスクを追加します。"""
    print(f"メモリアドレス: append_star: {id(part)}")
    if content_api.is_text(part.mimetype):
        yield content_api.ProcessorPart(part.text + "*")


@processor.part_processor_function
async def append_hash(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
    """テキストにハッシュを追加します。"""
    print(f"メモリアドレス: append_hash: {id(part)}")
    if content_api.is_text(part.mimetype):
        yield content_api.ProcessorPart(part.text + "#")


async def main():
    print("\n並列Part Processorsの出力:")
    parallel_processors = append_star // append_hash // processor.PASSTHROUGH_ALWAYS
    input_parts_parallel = streams.stream_content(
        [
            "Item_1",
            "Item_2",
            content_api.ProcessorPart(b"", mimetype="audio/l16;rate=24000"),
        ],
    )

    async for part in parallel_processors.to_processor()(input_parts_parallel):
        print(part)


if __name__ == "__main__":
    asyncio.run(main())

// 演算子は、入力タイプによって入力を前処理する場合に便利です。

p1 = processor.create_filter(content_api.is_image) + image_processor
p2 = processor.create_filter(content_api.is_audio) + audio_processor
total_processor = p1 // p2 // processor.PASSTHROUGH_FALLBACK

PartProcessor のスイッチ

Processorと同様に switch モジュールの PartSwitch で PartProcessor を振り分ける事ができます。

条件の設定などはどちらも同じです。

PartProcessorの PartSwitch と Processorの Switch の違いとしては、PartSwitch は入力と出力の順序が一致します。Switch は、入力と出力が一致しません。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams, switch


@processor.part_processor_function
async def append_star(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
    """テキストにアスタリスクを追加します。"""
    # 「PartProcessor の並列実行」を参照


@processor.part_processor_function
async def append_hash(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
    """テキストにハッシュを追加します。"""
    # 「PartProcessor の並列実行」を参照


async def main():
    input_stream = streams.stream_content(
        [
            content_api.ProcessorPart("a1", substream_name="a"),
            content_api.ProcessorPart("b1", substream_name="b"),
            content_api.ProcessorPart("a2", substream_name="a"),
            content_api.ProcessorPart("b2", substream_name="b"),
            content_api.ProcessorPart("b3", substream_name="b"),
        ],
    )

    m = (
        switch.PartSwitch(content_api.get_substream_name)
        .case("a", append_star)
        .case("b", append_hash)
        .default(processor.passthrough())
    )

    print("\nPartSwitchの出力:")
    p = m.to_processor()
    async for part in p(input_stream):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

streams と AsyncIterable の操作

これまでのサンプルコードを見ているとGenAI Processorsでは、AsyncIterable で動作します。streams モジュールには、ストリームを管理するための便利な機能を提供してます。

IterableをAsyncIterableに変換

streams.stream_content 関数は、リストなどのiterableを AsyncIterable に変換します。これまでのサンプルコードでもよく使用されており、テストでよく使用されます。

ストリームをリストに収集する

streams.gather_stream 関数は、AsyncIterable ストリームのアイテムをPythonのリストに集約します。有限なストリームである場合に、全体を取得する場合に便利です。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


@processor.processor_function
async def another_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    """すべてを小文字に変換します。"""
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield content_api.ProcessorPart(part.text.lower())


async def main():
    input_parts = ["First.", "Second."]
    input_streams = streams.stream_content(input_parts)
    gathered_list = await streams.gather_stream(another_text_processor(input_streams))

    print("\nストリームから収集したリスト:")
    print(gathered_list)


if __name__ == "__main__":
    asyncio.run(main())

ストリームの分割とマージ

streams.split 関数は、単一のストリームを複数の同一ストリームに分割します。

streams.merge 関数と streams.concat 関数は、複数のストリームを単一のストリームにマージします。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


@processor.processor_function
async def append_a(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    async for part in content:
        yield content_api.ProcessorPart(part.text + "A")


@processor.processor_function
async def append_b(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    async for part in content:
        yield content_api.ProcessorPart(part.text + "B")


@processor.processor_function
async def append_c(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
    async for part in content:
        yield content_api.ProcessorPart(part.text + "C")


async def main():
    print("\n分割とマージ")

    initial_stream = streams.stream_content(
        ["Start", "Finish"],
        # 各アイテムをyieldした後に遅延を追加します。これにより、「Start」アイテムが
        # 最初にyieldされるようになります。
        with_delay_sec=0.001,
    )

    # ストリームを3つに分割(initial_streamが3つになる感じ)
    stream1, stream2, stream3 = streams.split(initial_stream, n=3)

    # 各ストリームを独立して処理
    processed_stream1 = append_a(stream1)
    processed_stream2 = append_b(stream2)
    processed_stream3 = append_c(stream3)

    # 処理されたストリームをマージ
    merged_stream = streams.merge(
        [
            processed_stream1,
            processed_stream2,
            processed_stream3,
        ],
    )

    async for part in merged_stream:
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

外部入力からストリームを作成

@processor.source デコレータを使うことで、入力がマイクやカメラなどの外部ソースを受け取ることができます。

自分自身で作成することもできますが、text.terminal_input でターミナルからのテキストの受け取りや audio_io.PyAudioIn でマイクからの音声データを受け取る事ができます。

@processor.source デコレータや text.terminal_input などは、Processorに変換されるため、+ 演算子でProcessorを連結することもできます。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor
from genai_processors.core import text


@processor.part_processor_function
async def hello(part: content_api.ProcessorPart) -> AsyncIterable[content_api.ProcessorPart]:
    if content_api.is_text(part.mimetype) and part.text != "":
        yield content_api.ProcessorPart(f"Hello, {part.text}")


async def main():
    print("\nTerminalから入力:")

    p_hello = hello.to_processor()
    async for part in p_hello(text.terminal_input()):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

終わりに

結構なボリュームになりましたが、GenAI Processorを使用していく上で重要となるProcessorについてまとめてみました。

Processorを連結させたり、並列実行させたりなど機能が盛り沢山でした。

書ききれてないところもあると思うので、 Processor Intro Colab を見てみると良いかもしれません。

次回は、カスタムProcessorの作成についてです。

参考

こちらのリンクからシリーズ記事を一覧で見られます。よろしければこちらもご覧ください!
『GenAI Processorsを触ってみる』シリーズ一覧


採用情報
お問い合わせ