[Dd]enzow(ill)? with DB and Python

DBとか資格とかPythonとかの話をつらつらと

Django Channelsのdatabase_sync_to_asyncを理解しなくて死んだ話

Django Channelsの@database_sync_to_asyncデコレータを正しく理解しなくてえらい目('MySQL server has gone away')にあったので、確認して理解した内容を残しておきます。

ざっくりと

Django Channelsを使うと、DjangoでもWebSocketを使用するアプリケーションを作る事ができます。WebSocket通信の処理はConsumerというコンポーネントにクラスベースで定義していきます。こいつは非同期実装と同期実装の2種類がありますが、パフォーマンス的には非同期実装が優れています。この時、Djangoのモデルに対する処理については@database_sync_to_asyncデコレータをつけるようにマニュアルに記載されています。

https://channels.readthedocs.io/en/latest/topics/databases.html

channels.db.database_sync_to_async is a version of asgiref.sync.sync_to_async that also cleans up database connections on exit.

ここを流し読みしたのでえらい目に会いました。

問題になるコード

以下は抜粋ですが、このConsumerにおける_save_messageが問題になります。

class ChatConsumer(AsyncWebsocketConsumer):
    """
    WebSocket通信のハンドラ(非同期実装)
    """
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']
        self._save_message(message)
        await self.send(text_data=json.dumps({
            'message': message
        }))

    # DB操作を伴う処理を含んだメソッド
    # @database_sync_to_async
    def _save_message(self, message):
        ChatLog.objects.create(
            room_name=self.room_name,
            message=message,
        )

_save_messageChatLog.objects.createというコードからわかるように、Djangoのモデルを操作していますので、DBへのコネクションが発生するコードです。チャットルームのログを都度生成するロジックになっています。マニュアルに従えば@database_sync_to_asyncをつけるべきコードです。

なぜ気が付かなかったのか

@database_sync_to_asyncデコレータを付与しないとしても、このコードは正常に稼働します。しかし、DBとのコネクションに問題があります。

まずはチャットルームを開いて処理を適当に行います。その状態でshow processlistをするとこんな感じです。

mysql> show processlist;
+----+--------+------------------+------+---------+------+-------+------------------+
| Id | User   | Host             | db   | Command | Time | State | Info             |
+----+--------+------------------+------+---------+------+-------+------------------+
|  3 | docker | 172.21.0.4:38480 | mydb | Sleep   |    1 |       | NULL             |
|  4 | root   | 172.21.0.1:32878 | mydb | Query   |    0 | init  | show processlist |
+----+--------+------------------+------+---------+------+-------+------------------+
2 rows in set (0.01 sec)

ID:3がDjangoからの処理のようです。さらにもう一つタブを開いて2窓で処理してみます。

mysql> show processlist;
+----+--------+------------------+------+---------+------+-------+------------------+
| Id | User   | Host             | db   | Command | Time | State | Info             |
+----+--------+------------------+------+---------+------+-------+------------------+
|  3 | docker | 172.21.0.4:38480 | mydb | Sleep   |    1 |       | NULL             |
|  4 | root   | 172.21.0.1:32878 | mydb | Query   |    0 | init  | show processlist |
+----+--------+------------------+------+---------+------+-------+------------------+
2 rows in set (0.00 sec)

かわりません。どうやらConsumer全体でアクティブなコネクションがあればそれを使いまわしているように見えます。

とりあえず処理はちゃんとできていました。

何が問題なのか

ここでID:3をKILLしてみます。

mysql> kill 3;
Query OK, 0 rows affected (0.00 sec)

mysql> show processlist;
+----+------+------------------+------+---------+------+-------+------------------+
| Id | User | Host             | db   | Command | Time | State | Info             |
+----+------+------------------+------+---------+------+-------+------------------+
|  4 | root | 172.21.0.1:32878 | mydb | Query   |    0 | init  | show processlist |
+----+------+------------------+------+---------+------+-------+------------------+
1 row in set (0.00 sec)

いなくなりました。新しく窓を開いてチャットを試みるとうまくいきません。Django側に以下のエラーが出ます。

service_1        | 2018-07-29 14:57:58,634 - ERROR - server - Exception inside application: (2006, 'MySQL server has gone away')
service_1        |   File "/usr/local/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
service_1        |     await await_many_dispatch([receive, self.channel_receive], self.dispatch)
service_1        |   File "/usr/local/lib/python3.6/site-packages/channels/utils.py", line 48, in await_many_dispatch
service_1        |     await dispatch(result)
service_1        |   File "/usr/local/lib/python3.6/site-packages/channels/consumer.py", line 67, in dispatch
service_1        |     await handler(message)
service_1        |   File "/app/mysite/chat/consumers.py", line 49, in chat_message
service_1        |     self._save_message(message)
service_1        |   File "/app/mysite/chat/consumers.py", line 63, in _save_message
service_1        |     message=message,
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/models/manager.py", line 82, in manager_method
service_1        |     return getattr(self.get_queryset(), name)(*args, **kwargs)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 417, in create
service_1        |     obj.save(force_insert=True, using=self.db)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/models/base.py", line 729, in save
service_1        |     force_update=force_update, update_fields=update_fields)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/models/base.py", line 756, in save_base
service_1        |     with transaction.atomic(using=using, savepoint=False):
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/transaction.py", line 173, in __enter__
service_1        |     connection.set_autocommit(False, force_begin_transaction_with_broken_autocommit=True)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/backends/base/base.py", line 404, in set_autocommit
service_1        |     self._set_autocommit(autocommit)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 266, in _set_autocommit
service_1        |     self.connection.autocommit(autocommit)
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/utils.py", line 89, in __exit__
service_1        |     raise dj_exc_value.with_traceback(traceback) from exc_value
service_1        |   File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 266, in _set_autocommit
service_1        |     self.connection.autocommit(autocommit)
service_1        |   File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 272, in autocommit
service_1        |     _mysql.connection.autocommit(self, on)
service_1        |   (2006, 'MySQL server has gone away')

MySQL server has gone awayは、DBサーバ死んでたりする場合によく見るエラーですが、もちろんプロセスを一つKILLしただけでDBは生きています。先程ID:3を使っていた処理ならわかりますが、新規で開いたタブで発生し以降どれだけページをリロードしてWebsocketを再接続しても治りません。Django自体の再起動が必要でした。

恐らく、Django側ではID:3に対応するConnectionオブジェクトは利用可能とマークされているため、再利用しつづけているような感じです。今回はKILLで再現させましたが、長期間無通信等でコネクションが切断されることはままありえるので、発生し得る大きな問題です。

逆に開発時は、長期間アイドルな状態になりにくいので開発環境では見落としてしまっていました。

@database_sync_to_asyncをつけると?

こんな感じのコードに変えます。

    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']
        # asyncにしているのでawaitする必要がある
        await self._save_message(message)
        await self.send(text_data=json.dumps({
            'message': message
        }))

    @database_sync_to_async
    def _save_message(self, message):
        ChatLog.objects.create(
            room_name=self.room_name,
            message=message,
        )

この状態で処理をした後のMySQLを確認します。

mysql> show processlist;
+----+------+------------------+------+---------+------+-------+------------------+
| Id | User | Host             | db   | Command | Time | State | Info             |
+----+------+------------------+------+---------+------+-------+------------------+
|  4 | root | 172.21.0.1:32878 | mydb | Query   |    0 | init  | show processlist |
+----+------+------------------+------+---------+------+-------+------------------+
1 row in set (0.00 sec)

このようにConsumerからの接続は残っていません。database_sync_to_asyncをつけた場合はデコレートされた処理を抜けた時点で、コネクションの破棄が行われているので接続が残らず前述のような問題は起きません。

都度接続のパフォーマンスはいかがなものかとも思いますが、MySQLはスレッドベースな分コネクションを生成するコストは多少マシだからいいのでしょうか。

ちなみにchannels.db.DatabaseSyncToAsyncのソースはこんな感じでした。

class DatabaseSyncToAsync(SyncToAsync):
    """
    SyncToAsync version that cleans up old database connections when it exits.
    """

    def thread_handler(self, loop, *args, **kwargs):
        try:
            return super().thread_handler(loop, *args, **kwargs)
        finally:
            close_old_connections()


# The class is TitleCased, but we want to encourage use as a callable/decorator
database_sync_to_async = DatabaseSyncToAsync

@sync_to_asyncデコレータを継承し、close_old_connectionsを追加で実行するだけの実装でした。

まとめ

マニュアルちゃんと嫁って話ですが、一応備忘録として残しておきました。ちなみに、結構ありがちなのがbook_instance.author.nameみたいなコードです。これでもコネクションは発生し得るのでdatabase_sync_to_asyncを漏らすと今回の事象に繋がり得るということはちゃんと注意したいです。