grasys blog

GenAI Processorsを触ってみる 4/4 ~ Exampleを動かしてみる ~

はじめまして!エンジニアのUemaです。

今回は、AIパイプラインをシンプルに書けるように設計された、Google DeepMind の新しいオープンソース Python ライブラリである「GenAI Processors」をGet Startedやexampleを参考にしながら触ってみたいと思います。

シンプルに書けるとはいえ、ライブラリの構成要素やコンセプトなどを理解しないと混乱するため、それらもまとめてみました。

4部構成となっていて、今回は最後のPart4となっています。

Part4は、githubにあるExampleの1つを試していきます。個人的にやりたいことがうまくいかなかったところで、それに対する対応もやってみました。

シリーズの他の記事はこちらから見られます。
『GenAI Processorsを触ってみる』シリーズ一覧

GenAI Processorsとは

Part 1でも書きましたが、

AIパイプラインを構築でき、効率的な並列コンテンツ処理を可能にする軽量の Python ライブラリです。

テキスト、音声、画像といった複数のデータ形式(マルチモーダル)を組み合わせ、リアルタイムで処理するような複雑なアプリケーションの開発を効率化することに特化しています。

Exampleを触ってみる

githubのexamplesのReal-Time Live Example を参考に動かしてみたいと思います。

しかし、examplesにあるものは、Gemini Developer APIを使用するサンプルとなっています。今のところ、GenaiModelGemini Developer APIだけしか対応していませんでした。

個人的にはVertex AI Gemini APIを使いたかったので、GenaiModel を参考にカスタムプロセッサを作成しました。

ライブラリのインストール

下記のコードに必要そうなライブラリをuvを使ってインストールしてます。

uv add genai-processors pyaudio python-dotenv termcolor

カスタムプロセッサ(genai_model_for_vertexai.py)
"""genai_model_for_vertexai.py

genai_processors.genai_modelを参考に作成
"""

from collections.abc import AsyncIterable

from genai_processors import content_api, processor
from genai_processors.core.genai_model import genai_response_to_metadata
from google.genai import client, types


class GenaiModelForVertexAI(processor.Processor):
    """Vertex AI Gemini APIのGenaiModel"""

    def __init__(
        self,
        project_id: str,
        model_name: str,
        location: str = "location",
        api_key: str | None = None,
        generate_content_config: (types.GenerateContentConfigOrDict | None) = None,
        debug_config: client.DebugConfig | None = None,
        http_options: (types.HttpOptions | types.HttpOptionsDict | None) = None,
    ) -> None:
        self._client = client.Client(
            vertexai=True,
            project=project_id,
            location=location,
            api_key=api_key,
            debug_config=debug_config,
            http_options=http_options,
        )
        self._model_name = model_name
        self._generate_content_config = generate_content_config
        self._parser = None

    async def _generate_from_api(
        self,
        content: AsyncIterable[content_api.ProcessorPartTypes],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        """Internal method to call the GenAI API and stream results."""
        turn = types.Content(parts=[])
        contents = []
        async for content_part in content:
            cp = content_api.ProcessorPart(content_part)
            if turn.role and cp.role != turn.role:
                contents.append(turn)
                turn = types.Content(parts=[])

            turn.role = cp.role or "user"
            turn.parts.append(content_api.to_genai_part(cp))  # pylint: disable=attribute-error

        if turn.role:
            contents.append(turn)

        if not contents:
            return

        async for res in await self._client.aio.models.generate_content_stream(
            model=self._model_name,
            contents=contents,
            config=self._generate_content_config,
        ):
            if res.candidates:
                content = res.candidates[0].content
                if content and content.parts:
                    for part in content.parts:
                        yield processor.ProcessorPart(
                            part,
                            metadata=genai_response_to_metadata(res),
                            role=content.role or "model",
                        )

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPartTypes],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        """Processor logic"""
        api_stream = self._generate_from_api(content)

        if self._parser:
            async for part in self._parser(api_stream):
                yield part
        else:
            async for part in api_stream:
                yield part

実行コード(realtime_simple_cli.py)

"""realtime_simple_cli.py"""

import asyncio
import os
from collections.abc import Sequence

import pyaudio
import termcolor
from absl import app
from dotenv import load_dotenv
from genai_processors import content_api, context, processor
from genai_processors.core import audio_io, rate_limit_audio, realtime, speech_to_text, text, text_to_speech
from google.cloud import speech_v2
from google.cloud.speech_v2.types import ExplicitDecodingConfig
from google.genai import types

from genai_model_for_vertexai import GenaiModelForVertexAI

load_dotenv()

GOOGLE_PROJECT_ID = os.environ["GOOGLE_PROJECT_ID"]
SAMPLE_RATE_HERTZ = 24000

INSTRUCTION_PARTS = [
    "あなたはユーザーと会話するエージェントです。"
    "ユーザーにとって会話が生き生きとして興味深いものになるよう努めてください。"
    "ジョークを飛ばしたり、見たり聞いたりしたことに関連した興味深い事実を説明したり、何が起こるかを予測したり、何らかの行動や反応を判断したりすることもできます。"
    "ユーザーには最大でも数文で応答してください。短く、興味深く、長々とした独白は避けてください。"
    "Google検索を使って、ユーザーの質問に追加情報を加えたり、興味深いニュースや事実を見つけ出したりすることもできます。",
]


@processor.create_filter
def _filter_parts(part: content_api.ProcessorPart) -> bool:
    if context.is_reserved_substream(part.substream_name):
        return False
    return not (content_api.is_audio(part.mimetype) and part.role.lower() == "model")


async def run_conversation() -> None:
    """Run conversation"""
    pya = pyaudio.PyAudio()

    input_processor = audio_io.PyAudioIn(pya) + speech_to_text.SpeechToText(
        project_id=GOOGLE_PROJECT_ID,
        with_interim_results=False,
        recognition_config=speech_v2.RecognitionConfig(
            explicit_decoding_config=ExplicitDecodingConfig(
                sample_rate_hertz=SAMPLE_RATE_HERTZ,
                encoding=ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                audio_channel_count=1,
            ),
            language_codes=["ja-JP"],  # 日本語で行うため
            model="latest_long",
        ),
    )

    genai_processor = _filter_parts + GenaiModelForVertexAI(
        project_id=GOOGLE_PROJECT_ID,
        location="global",
        model_name="gemini-2.5-flash",
        generate_content_config=types.GenerateContentConfig(
            system_instruction=[part.text for part in content_api.ProcessorContent(INSTRUCTION_PARTS)],
            response_modalities=["TEXT"],
            tools=[types.Tool(google_search=types.GoogleSearch())],
        ),
    )

    tts = text_to_speech.TextToSpeech(
        project_id=GOOGLE_PROJECT_ID,
    ) + rate_limit_audio.RateLimitAudio(
        sample_rate=SAMPLE_RATE_HERTZ,
        delay_other_parts=True,
    )

    play_output = audio_io.PyAudioOut(pya)

    conversation_agent = input_processor + realtime.LiveProcessor(turn_processor=genai_processor + tts) + play_output

    async for part in conversation_agent(text.terminal_input()):
        match part.role:
            case "user":
                color = "green"
            case "model":
                color = "red"
            case _:
                color = "yellow"
        part_role = part.role or "default"
        print(
            termcolor.colored(
                f"{part_role}: {part.text}",
                color,
                "on_grey",
                attrs=["bold"],
            ),
        )


def main(argv: Sequence[str]) -> None:
    """Main"""
    del argv  # Unused.
    if not GOOGLE_PROJECT_ID:
        msg = "Project ID is not set. Define a GOOGLE_PROJECT_ID environment variable obtained from your Cloud project."
        raise ValueError(msg)
    asyncio.run(run_conversation())


if __name__ == "__main__":
    app.run(main)

Gemini APIと会話することができました!

おわりに

今回はExampleを参考にProcessorを組み合わせることで、AIパイプラインを作ることができました。

複雑なAIパイプラインもプロセッサを組み合わせることによって、簡単に作成できそうだなと感じました。

参考

こちらのリンクからシリーズ記事を一覧で見られます。よろしければこちらもご覧ください!
『GenAI Processorsを触ってみる』シリーズ一覧


採用情報
お問い合わせ