【Python】asyncioとconcurrent.futuresを組み合わせて効率的な並行処理を実装する方法

はじめに

Pythonでは非同期処理を行う方法がいくつか存在します。その中でも、asyncioは非同期I/Oに強く、concurrent.futuresはCPU負荷の高い処理やブロッキング処理に向いています。しかし、両者を別々に使うだけでは、WebリクエストのようなI/O処理と、画像処理や数値計算のようなCPU処理を効率よく同時に扱うことは難しいです。

本記事では、asyncioconcurrent.futuresを組み合わせることで、I/OとCPU処理を同時に走らせる設計を実践的に解説していきます。特に、小規模事業者や個人開発者がWebスクレイピングやデータ処理を効率化する際に役立つ内容です。

asyncioとconcurrent.futuresの役割の違い

まずは、それぞれのライブラリの役割を整理します。

  • asyncio: イベントループを用いた非同期I/O処理。ネットワーク通信やファイルI/Oのような待ち時間の多い処理に適しています。
  • concurrent.futures: スレッドまたはプロセスプールを提供し、CPU処理やブロッキング関数を並列化できます。

例えば、Web APIからデータを取得して、取得したデータを画像処理や解析に回すケースを考えると、API呼び出しはasyncioで、解析処理はProcessPoolExecutorで並行実行するのが効率的です。

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_task(x: int) -> int:
    # 仮の重い計算処理
    return sum(i * i for i in range(10**6)) + x

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task, 42)
        print("計算結果:", result)

asyncio.run(main())

上記コードでは、CPUに重い処理をProcessPoolExecutorに投げつつ、非同期ループの中で結果を待つことができます。

I/O処理とCPU処理を同時に扱う設計

asyncioのイベントループはI/Oタスクを効率的に扱いますが、CPU処理をそのまま書くとループをブロックしてしまいます。そこで、loop.run_in_executorを利用してCPUタスクを外部のスレッド/プロセスプールに渡し、I/Oタスクと共存させるのが重要です。

次の例では、複数のAPIリクエストを非同期に処理しつつ、その結果をCPU集約的な処理に流しています。

import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor

def heavy_analysis(data: str) -> str:
    return f"分析済み: {len(data)}文字"

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.text()

async def main():
    urls = ["https://example.com", "https://httpbin.org/get"]
    async with aiohttp.ClientSession() as session:
        texts = await asyncio.gather(*(fetch(session, u) for u in urls))
        
        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            results = await asyncio.gather(
                *(loop.run_in_executor(pool, heavy_analysis, t) for t in texts)
            )
        print(results)

asyncio.run(main())

このように設計することで、ネットワークI/OとCPU処理を効率的に両立できます。

実践コード:Webリクエストと画像処理の並行実行

より実用的な例として、Webから画像を取得して、その場でリサイズ処理を行うケースを考えます。Pillowを利用し、I/OとCPU処理を同時に扱う流れを紹介します。

import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor
from PIL import Image
import io

def resize_image(data: bytes, size=(128, 128)):
    img = Image.open(io.BytesIO(data))
    return img.resize(size)

async def download_image(session, url):
    async with session.get(url) as resp:
        return await resp.read()

async def main():
    urls = ["https://httpbin.org/image/png", "https://httpbin.org/image/jpeg"]
    async with aiohttp.ClientSession() as session:
        images = await asyncio.gather(*(download_image(session, u) for u in urls))

        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            resized = await asyncio.gather(
                *(loop.run_in_executor(pool, resize_image, img) for img in images)
            )
        for i, img in enumerate(resized):
            img.save(f"resized_{i}.png")

asyncio.run(main())

このコードでは、複数の画像を同時にダウンロードし、その後CPU負荷の高い画像処理を並列に実行しています。

エラーハンドリングとリソース管理の工夫

並行処理では、エラー処理とリソース管理も重要です。特に、非同期タスクが失敗しても全体が落ちないようにするために、asyncio.gatherreturn_exceptions=Trueを指定するのが有効です。

results = await asyncio.gather(
    *(fetch(session, u) for u in urls),
    return_exceptions=True
)
for r in results:
    if isinstance(r, Exception):
        print("エラー発生:", r)
    else:
        print("成功:", len(r))

また、プールのサイズを制御することでリソースの使い過ぎを防ぐことができます。例えばProcessPoolExecutor(max_workers=2)と指定すれば、CPU処理が暴走するのを防げます。

おわりに

本記事では、asyncioconcurrent.futuresを組み合わせることで、I/O処理とCPU処理を効率的に並行実行する方法を解説しました。これにより、Web APIの呼び出しとデータ解析を同時に進めるといった、実用的なワークフローをスムーズに構築できます。

今後は、さらに高度な並行処理として、タスクキュー(例:Celery)や分散処理フレームワークと組み合わせることで、大規模システムにも応用できるでしょう。

また、APIの仕様変更やうまく動作しないなどがありましたら、遠慮なくコメントまでお寄せください!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です