Pythonでは非同期処理を行う方法がいくつか存在します。その中でも、asyncio
は非同期I/Oに強く、concurrent.futures
はCPU負荷の高い処理やブロッキング処理に向いています。しかし、両者を別々に使うだけでは、WebリクエストのようなI/O処理と、画像処理や数値計算のようなCPU処理を効率よく同時に扱うことは難しいです。
本記事では、asyncio
とconcurrent.futures
を組み合わせることで、I/OとCPU処理を同時に走らせる設計を実践的に解説していきます。特に、小規模事業者や個人開発者がWebスクレイピングやデータ処理を効率化する際に役立つ内容です。
まずは、それぞれのライブラリの役割を整理します。
- asyncio: イベントループを用いた非同期I/O処理。ネットワーク通信やファイルI/Oのような待ち時間の多い処理に適しています。
- concurrent.futures: スレッドまたはプロセスプールを提供し、CPU処理やブロッキング関数を並列化できます。
例えば、Web APIからデータを取得して、取得したデータを画像処理や解析に回すケースを考えると、API呼び出しはasyncio
で、解析処理はProcessPoolExecutor
で並行実行するのが効率的です。
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_task(x: int) -> int:
# 仮の重い計算処理
return sum(i * i for i in range(10**6)) + x
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task, 42)
print("計算結果:", result)
asyncio.run(main())
上記コードでは、CPUに重い処理をProcessPoolExecutor
に投げつつ、非同期ループの中で結果を待つことができます。
asyncio
のイベントループはI/Oタスクを効率的に扱いますが、CPU処理をそのまま書くとループをブロックしてしまいます。そこで、loop.run_in_executor
を利用してCPUタスクを外部のスレッド/プロセスプールに渡し、I/Oタスクと共存させるのが重要です。
次の例では、複数のAPIリクエストを非同期に処理しつつ、その結果をCPU集約的な処理に流しています。
import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor
def heavy_analysis(data: str) -> str:
return f"分析済み: {len(data)}文字"
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = ["https://example.com", "https://httpbin.org/get"]
async with aiohttp.ClientSession() as session:
texts = await asyncio.gather(*(fetch(session, u) for u in urls))
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
results = await asyncio.gather(
*(loop.run_in_executor(pool, heavy_analysis, t) for t in texts)
)
print(results)
asyncio.run(main())
このように設計することで、ネットワークI/OとCPU処理を効率的に両立できます。
より実用的な例として、Webから画像を取得して、その場でリサイズ処理を行うケースを考えます。Pillow
を利用し、I/OとCPU処理を同時に扱う流れを紹介します。
import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor
from PIL import Image
import io
def resize_image(data: bytes, size=(128, 128)):
img = Image.open(io.BytesIO(data))
return img.resize(size)
async def download_image(session, url):
async with session.get(url) as resp:
return await resp.read()
async def main():
urls = ["https://httpbin.org/image/png", "https://httpbin.org/image/jpeg"]
async with aiohttp.ClientSession() as session:
images = await asyncio.gather(*(download_image(session, u) for u in urls))
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
resized = await asyncio.gather(
*(loop.run_in_executor(pool, resize_image, img) for img in images)
)
for i, img in enumerate(resized):
img.save(f"resized_{i}.png")
asyncio.run(main())
このコードでは、複数の画像を同時にダウンロードし、その後CPU負荷の高い画像処理を並列に実行しています。
並行処理では、エラー処理とリソース管理も重要です。特に、非同期タスクが失敗しても全体が落ちないようにするために、asyncio.gather
にreturn_exceptions=True
を指定するのが有効です。
results = await asyncio.gather(
*(fetch(session, u) for u in urls),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print("エラー発生:", r)
else:
print("成功:", len(r))
また、プールのサイズを制御することでリソースの使い過ぎを防ぐことができます。例えばProcessPoolExecutor(max_workers=2)
と指定すれば、CPU処理が暴走するのを防げます。
本記事では、asyncio
とconcurrent.futures
を組み合わせることで、I/O処理とCPU処理を効率的に並行実行する方法を解説しました。これにより、Web APIの呼び出しとデータ解析を同時に進めるといった、実用的なワークフローをスムーズに構築できます。
今後は、さらに高度な並行処理として、タスクキュー(例:Celery)や分散処理フレームワークと組み合わせることで、大規模システムにも応用できるでしょう。
また、APIの仕様変更やうまく動作しないなどがありましたら、遠慮なくコメントまでお寄せください!