アイリッジ プロダクト開発グループの朴です。pythonでmysqlを扱うための多様なライブラリがありますが、その中、
- pymysql(PyMySQL)
- MySQLdb(mysqlclient)
- mysql.connector(mysql-connector-python)
- aiomysql(aiomysql)
がよく使われているかと思います。
上記に言及したライブラリはmysqlを扱うためのライブラリという共通点がありますが、詳細をみると 実装方法、機能、性能、対応領域が違い、用途に合わせて選んで使うのがいいと思います。
社内ではMySQLdbを主に使っています。 libmysqlclientのpythonラッパであり、安定しているし、特に性能が優れています。 その反面、gevent上ではブロックされるので非同期ioに対応していません。(古いバージョンではwaiterを使うことで可能) それで、monkey patchも効くpymysql、又はc extensionのumysql, ultramysqlをgevent上で使いました。
最近、社内でpython3にシステムの移行と共に、asyncioを使って非同期I/O処理させるパターンも増えて来ました。 それで、mysqlを扱うためのライブラリもaiomysqlを使うようになりました。 このaiomysqlはpymysqlを元にして開発されたasyncio対応のライブラリです。
でも、既存のシステムの改修と性能の面を考慮するとMySQLdbをasyncioと共に使いたいという思いが強く出ました。 それで調査した内容を共有したと思います。
テスト方法
テストは下記の方法で処理がブロックされずに非同期で処理されるか確認することにします。
- SELECT SLEEP(X) で2, 4, 6秒sleepする、3つのクエリをmysqlを投げる
- 処理状態はログで出力
- その後、3つの処理が終わるまでに待つ
- ブロックされなければ6秒で、3つの処理が終了する
テスト環境
python 3.7.9で、下記のようなpackageを使います。
- aiomysql 0.0.21
- mysqlclient 2.0.3
テスト
まず、aiomysqlを使って実装とその結果を見ます。
aiomysql + asyncio
import asyncio import logging import aiomysql logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s') async def sleep_async(delay): logging.info(f'{delay}-start') async with aiomysql.connect() as dbconn: async with dbconn.cursor() as cursor: await cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))
2021-01-22 15:01:04,508 468447(139686273333056) [DEBUG] Using selector: EpollSelector 2021-01-22 15:01:04,508 468447(139686273333056) [INFO] 2-start 2021-01-22 15:01:04,509 468447(139686273333056) [INFO] 4-start 2021-01-22 15:01:04,509 468447(139686273333056) [INFO] 6-start 2021-01-22 15:01:06,515 468447(139686273333056) [INFO] 2-end 2021-01-22 15:01:08,514 468447(139686273333056) [INFO] 4-end 2021-01-22 15:01:10,514 468447(139686273333056) [INFO] 6-end
結果をみると3つとも並行処理され、それぞれ2, 4, 6秒がかかって、トータルで6秒かかって問題なく処理されました。
次はMySQLdbを使って実行してみました。
MySQLdb + asyncio
import asyncio import concurrent.futures from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') async def sleep_async(delay): return sleep(delay) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))
2021-01-22 14:59:30,557 468336(139699468703552) [DEBUG] Using selector: EpollSelector 2021-01-22 14:59:30,558 468336(139699468703552) [INFO] 2-start 2021-01-22 14:59:32,562 468336(139699468703552) [INFO] 2-end 2021-01-22 14:59:32,563 468336(139699468703552) [INFO] 4-start 2021-01-22 14:59:36,564 468336(139699468703552) [INFO] 4-end 2021-01-22 14:59:36,564 468336(139699468703552) [INFO] 6-start 2021-01-22 14:59:42,565 468336(139699468703552) [INFO] 6-end
結果をみると一個ずつ順次的にじっこうされ、トータルで2+4+6で12秒かかったことがわかります。 予想通りにブロックされてしまいました。
ではasyncioではなく、threadを使った並行処理ではどうなるか確認してみます。
MySQLdb + thread
from threading import Thread from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') threads = [Thread(target=sleep, args=(i,)) for i in range(2, 7, 2)] for thread in threads: thread.daemon = True thread.start() for thread in threads: thread.join()
2021-01-22 14:59:02,385 468324(140151452448512) [INFO] 2-start 2021-01-22 14:59:02,385 468324(140151444055808) [INFO] 4-start 2021-01-22 14:59:02,385 468324(140151435663104) [INFO] 6-start 2021-01-22 14:59:04,392 468324(140151452448512) [INFO] 2-end 2021-01-22 14:59:06,392 468324(140151444055808) [INFO] 4-end 2021-01-22 14:59:08,391 468324(140151435663104) [INFO] 6-end
結果をみると、トータルで6秒かかって問題なく並行処理できたことが分かります。 ではMySQLdb + asyncioの組み合わせでブロックされずに処理できる方法はないのか?
調べてみたらpythonの公式ドキュメントにそのヒントがありました。 それは、thread又はprocessのpoolで実行することで、concurrent.futures.Executorをベースで実行する方法がありました。 では、executorはNoneにしてdefaultのexecutorが使われるようにしてやってみます。
MySQLdb + asyncio(with run_in_executor())
import asyncio import concurrent.futures from contextlib import closing import logging import MySQLdb as mysql logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s') def sleep(delay): logging.info(f'{delay}-start') with closing(mysql.connect()) as dbconn: with dbconn.cursor() as cursor: cursor.execute(f'SELECT SLEEP({delay})') logging.info(f'{delay}-end') loop = asyncio.get_event_loop() futures = [loop.run_in_executor(None, sleep, i) for i in range(2, 7, 2)] loop.run_until_complete(asyncio.gather(*futures))
2021-01-22 15:00:05,660 468378(140615479482176) [DEBUG] Using selector: EpollSelector 2021-01-22 15:00:05,663 468378(140615250941696) [INFO] 2-start 2021-01-22 15:00:05,664 468378(140615242548992) [INFO] 4-start 2021-01-22 15:00:05,664 468378(140615234156288) [INFO] 6-start 2021-01-22 15:00:07,669 468378(140615250941696) [INFO] 2-end 2021-01-22 15:00:09,669 468378(140615242548992) [INFO] 4-end 2021-01-22 15:00:11,670 468378(140615234156288) [INFO] 6-end
結果をみると3つとも並行処理され、トータルで6秒かかって問題なく処理されました。 executorをNoneにすると内部的にはThreadPoolExecutorで実行されるようになっていたので、 下記のようにloop処理部分をexecutorを指定する形に変えて実行してみます。
loop = asyncio.get_event_loop() executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) loop.run_until_complete(asyncio.wait({ loop.run_in_executor(executor, sleep, i) for i in range(2, 7, 2) }))
2021-01-22 15:00:29,678 468383(140262040205120) [DEBUG] Using selector: EpollSelector 2021-01-22 15:00:29,679 468383(140261811664640) [INFO] 2-start 2021-01-22 15:00:29,680 468383(140261803271936) [INFO] 4-start 2021-01-22 15:00:29,680 468383(140261794879232) [INFO] 6-start 2021-01-22 15:00:31,686 468383(140261811664640) [INFO] 2-end 2021-01-22 15:00:33,686 468383(140261803271936) [INFO] 4-end 2021-01-22 15:00:35,686 468383(140261794879232) [INFO] 6-end
結果は同一で問題なく処理されました。
3.9からは asyncio.to_thread()という関数でより簡単に使えるようになっています。下記のリンクを参考してください。
https://docs.python.org/3.9/library/asyncio-task.html#running-in-threads
これでMySQLdbをasyncioでブロックされずに使うことを確認してみました。 性能や安定性に問題ないのかも検証したいですね。