アイリッジ開発者ブログ

アイリッジに所属するエンジニアが技術情報を発信していきます。

MySQLdb(mysqlclient)をasyncioでブロックされずに使う

f:id:iridge-tech:20210420183650p:plain

アイリッジ プロダクト開発グループの朴です。pythonでmysqlを扱うための多様なライブラリがありますが、その中、

がよく使われているかと思います。

上記に言及したライブラリはmysqlを扱うためのライブラリという共通点がありますが、詳細をみると 実装方法、機能、性能、対応領域が違い、用途に合わせて選んで使うのがいいと思います。

社内ではMySQLdbを主に使っています。 libmysqlclientのpythonラッパであり、安定しているし、特に性能が優れています。 その反面、gevent上ではブロックされるので非同期ioに対応していません。(古いバージョンではwaiterを使うことで可能) それで、monkey patchも効くpymysql、又はc extensionのumysql, ultramysqlをgevent上で使いました。

最近、社内でpython3にシステムの移行と共に、asyncioを使って非同期I/O処理させるパターンも増えて来ました。 それで、mysqlを扱うためのライブラリもaiomysqlを使うようになりました。 このaiomysqlはpymysqlを元にして開発されたasyncio対応のライブラリです。

でも、既存のシステムの改修と性能の面を考慮するとMySQLdbをasyncioと共に使いたいという思いが強く出ました。 それで調査した内容を共有したと思います。

テスト方法

テストは下記の方法で処理がブロックされずに非同期で処理されるか確認することにします。

  • SELECT SLEEP(X) で2, 4, 6秒sleepする、3つのクエリをmysqlを投げる
  • 処理状態はログで出力
  • その後、3つの処理が終わるまでに待つ
  • ブロックされなければ6秒で、3つの処理が終了する

テスト環境

python 3.7.9で、下記のようなpackageを使います。

  • aiomysql 0.0.21
  • mysqlclient 2.0.3

テスト

まず、aiomysqlを使って実装とその結果を見ます。

aiomysql + asyncio

import asyncio
import logging

import aiomysql

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s')


async def sleep_async(delay):
    logging.info(f'{delay}-start')
    async with aiomysql.connect() as dbconn:
        async with dbconn.cursor() as cursor:
            await cursor.execute(f'SELECT SLEEP({delay})')
    logging.info(f'{delay}-end')


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))
2021-01-22 15:01:04,508 468447(139686273333056) [DEBUG] Using selector: EpollSelector
2021-01-22 15:01:04,508 468447(139686273333056) [INFO] 2-start
2021-01-22 15:01:04,509 468447(139686273333056) [INFO] 4-start
2021-01-22 15:01:04,509 468447(139686273333056) [INFO] 6-start
2021-01-22 15:01:06,515 468447(139686273333056) [INFO] 2-end
2021-01-22 15:01:08,514 468447(139686273333056) [INFO] 4-end
2021-01-22 15:01:10,514 468447(139686273333056) [INFO] 6-end

結果をみると3つとも並行処理され、それぞれ2, 4, 6秒がかかって、トータルで6秒かかって問題なく処理されました。

次はMySQLdbを使って実行してみました。

MySQLdb + asyncio

import asyncio
import concurrent.futures
from contextlib import closing
import logging

import MySQLdb as mysql

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s')


def sleep(delay):
    logging.info(f'{delay}-start')
    with closing(mysql.connect()) as dbconn:
        with dbconn.cursor() as cursor:
            cursor.execute(f'SELECT SLEEP({delay})')
    logging.info(f'{delay}-end')


async def sleep_async(delay):
    return sleep(delay)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(sleep_async(2), sleep_async(4), sleep_async(6)))
2021-01-22 14:59:30,557 468336(139699468703552) [DEBUG] Using selector: EpollSelector
2021-01-22 14:59:30,558 468336(139699468703552) [INFO] 2-start
2021-01-22 14:59:32,562 468336(139699468703552) [INFO] 2-end
2021-01-22 14:59:32,563 468336(139699468703552) [INFO] 4-start
2021-01-22 14:59:36,564 468336(139699468703552) [INFO] 4-end
2021-01-22 14:59:36,564 468336(139699468703552) [INFO] 6-start
2021-01-22 14:59:42,565 468336(139699468703552) [INFO] 6-end

結果をみると一個ずつ順次的にじっこうされ、トータルで2+4+6で12秒かかったことがわかります。 予想通りにブロックされてしまいました。

ではasyncioではなく、threadを使った並行処理ではどうなるか確認してみます。

MySQLdb + thread

from threading import Thread
from contextlib import closing
import logging

import MySQLdb as mysql

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s')


def sleep(delay):
    logging.info(f'{delay}-start')
    with closing(mysql.connect()) as dbconn:
        with dbconn.cursor() as cursor:
            cursor.execute(f'SELECT SLEEP({delay})')
    logging.info(f'{delay}-end')


threads = [Thread(target=sleep, args=(i,)) for i in range(2, 7, 2)]
for thread in threads:
    thread.daemon = True
    thread.start()
for thread in threads:
    thread.join()
2021-01-22 14:59:02,385 468324(140151452448512) [INFO] 2-start
2021-01-22 14:59:02,385 468324(140151444055808) [INFO] 4-start
2021-01-22 14:59:02,385 468324(140151435663104) [INFO] 6-start
2021-01-22 14:59:04,392 468324(140151452448512) [INFO] 2-end
2021-01-22 14:59:06,392 468324(140151444055808) [INFO] 4-end
2021-01-22 14:59:08,391 468324(140151435663104) [INFO] 6-end

結果をみると、トータルで6秒かかって問題なく並行処理できたことが分かります。 ではMySQLdb + asyncioの組み合わせでブロックされずに処理できる方法はないのか?

調べてみたらpythonの公式ドキュメントにそのヒントがありました。 それは、thread又はprocessのpoolで実行することで、concurrent.futures.Executorをベースで実行する方法がありました。 では、executorはNoneにしてdefaultのexecutorが使われるようにしてやってみます。

MySQLdb + asyncio(with run_in_executor())

import asyncio
import concurrent.futures
from contextlib import closing
import logging

import MySQLdb as mysql

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(process)d(%(thread)s) [%(levelname)s] %(message)s')


def sleep(delay):
    logging.info(f'{delay}-start')
    with closing(mysql.connect()) as dbconn:
        with dbconn.cursor() as cursor:
            cursor.execute(f'SELECT SLEEP({delay})')
    logging.info(f'{delay}-end')


loop = asyncio.get_event_loop()
futures = [loop.run_in_executor(None, sleep, i) for i in range(2, 7, 2)]
loop.run_until_complete(asyncio.gather(*futures))
2021-01-22 15:00:05,660 468378(140615479482176) [DEBUG] Using selector: EpollSelector
2021-01-22 15:00:05,663 468378(140615250941696) [INFO] 2-start
2021-01-22 15:00:05,664 468378(140615242548992) [INFO] 4-start
2021-01-22 15:00:05,664 468378(140615234156288) [INFO] 6-start
2021-01-22 15:00:07,669 468378(140615250941696) [INFO] 2-end
2021-01-22 15:00:09,669 468378(140615242548992) [INFO] 4-end
2021-01-22 15:00:11,670 468378(140615234156288) [INFO] 6-end

結果をみると3つとも並行処理され、トータルで6秒かかって問題なく処理されました。 executorをNoneにすると内部的にはThreadPoolExecutorで実行されるようになっていたので、 下記のようにloop処理部分をexecutorを指定する形に変えて実行してみます。

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
loop.run_until_complete(asyncio.wait({
    loop.run_in_executor(executor, sleep, i) for i in range(2, 7, 2)
}))
2021-01-22 15:00:29,678 468383(140262040205120) [DEBUG] Using selector: EpollSelector
2021-01-22 15:00:29,679 468383(140261811664640) [INFO] 2-start
2021-01-22 15:00:29,680 468383(140261803271936) [INFO] 4-start
2021-01-22 15:00:29,680 468383(140261794879232) [INFO] 6-start
2021-01-22 15:00:31,686 468383(140261811664640) [INFO] 2-end
2021-01-22 15:00:33,686 468383(140261803271936) [INFO] 4-end
2021-01-22 15:00:35,686 468383(140261794879232) [INFO] 6-end

結果は同一で問題なく処理されました。

3.9からは asyncio.to_thread()という関数でより簡単に使えるようになっています。下記のリンクを参考してください。

https://docs.python.org/3.9/library/asyncio-task.html#running-in-threads

これでMySQLdbをasyncioでブロックされずに使うことを確認してみました。 性能や安定性に問題ないのかも検証したいですね。