async のついていない関数を非同期化して、async のついた非同期関数と同じように扱いたいというときの方法です。

環境
- python 3.8.1
概要
同期関数を非同期化するには run_in_executor
を使用します。
ただし、CPU バウンドな処理を非同期化しても並列では実行されないので、マルチプロセス化などを行う必要があります。 これについては以下の記事が参考になりました。
また、run_in_executor
で実行する関数に名前付き引数を渡す場合、functools.partial
を使う必要があります。
引数で渡せる値は、Python の仕様上 Picklable な値のみだそうです。
ラムダ式やモジュールトップレベル以外で定義された関数やクラスは渡せません
サンプルコード
上記を踏まえたサンプルコードです。
同期処理として CPU バウンドを想定した処理を2つ、非同期処理として I/O バウンドを想定した処理を2つ、計4つの処理を並列で動かしています。
import asyncio
import time
from functools import partial
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def cpu_bound(index):
# CPU バウンドな処理を想定
count = 0
while count < 1000:
print(f'cpu_bound[{index}] count={count}')
count += 1
time.sleep(0.01)
async def io_bound(index):
# I/O バウンドな処理を想定
count = 0
while count < 1000:
print(f'io_bound[{index}] count={count}')
count += 1
await asyncio.sleep(0.01)
async def main():
loop = asyncio.get_running_loop()
executor = ProcessPoolExecutor(max_workers=2)
tasks = [
loop.run_in_executor(executor, partial(cpu_bound, 0)),
loop.run_in_executor(executor, partial(cpu_bound, 1)),
io_bound(0),
io_bound(1),
]
await asyncio.gather(*tasks)
asyncio.run(main(), debug=True)
ProcessPoolExecutor
により、CPU バウンドな処理はそれぞれ個別のプロセスとして動作させています。
I/O バウンドな処理はどちらもメインプロセスで動作します。
従って、I/O バウンドな処理のほうが少し終わるのが遅くなります。
参考 URL
- https://cpoint-lab.co.jp/article/202208/23186/
- https://pod.hatenablog.com/entry/2019/03/21/162511
- https://hachibeedi.github.io/entry/unify-sync-and-async-function-in-python/
- https://qiita.com/smatsumt/items/d8f290e40077a14210f2
- https://docs.python.org/ja/3/library/pickle.html#what-can-be-pickled-and-unpickled