[Dd]enzow(ill)? with DB and Python

DBとか資格とかPythonとかの話をつらつらと

Django 2.0 + Channels 2.xを使ってWebsocketを扱う(その4)

前回に引き続きChannelsを触っていきます。前回で一応チャットとして必要なRoomの概念をChannel LayerのGroupで実装しました。今回はその実装を少しいじっていきます。

ConsumerのAsync実装への切り替え

前回実装したconsuemers.ChatConsumerWebsocketConsumerを継承したSync実装でした。そのため、非同期処理であるself.channel_layer.group_add等を使うためにasync_to_syncでラップする必要がありました。

これをAsyncWebsocketConsumerの継承に切り替えて、全体をAsync実装にします。以下はチュートリアルのコードそのままですが、以下のようになります。

# chat/consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json


class ChatConsumer(AsyncWebsocketConsumer):
    """
    WebSocket通信のハンドラ(非同期実装)
    """
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'chat_%s' % self.room_name

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

channel_layer関連の操作をラップする必要はなく、代わりにAynsc Awaitで処理しています。処理内容はそのままですので、チャットとしての機能はかわりません。

この部分のcommitは以下です。

github.com

Sync?Async?

AsyncWebsocketConsumerを継承した非同期実装とWebsocketConsumerを継承した同期実装はどちらがすぐれているのでしょうか。

Rewrite the consumer to be asynchronousを見るとこうあります。

Synchronous consumers are convenient because they can call regular synchronous I/O functions such as those that access Django models without writing special code. However asynchronous consumers can provide a higher level of performance since they don’t need create additional threads when handling requests.

同期実装の場合、Djnago のORM等を用いたI/Oについて特別な考慮点がなくかけるというメリットがあります。一方で非同期実装にした場合、リクエストの処理のため追加でスレッドを生成する必要がないためより良いパフォーマンスが出るということでした。

そのためWebsocketでシビアなパフォーマンスを要求される場合は非同期実装を選び、そこまでの性能要件がない場合は同期実装のほうが適しているようです。

Async実装の場合のORM操作

一応Async実装にした場合、Django ORM等の操作はどのようなコードになるかを確認しておきます。ドキュメント上は以下にあります。

Database Access — Channels 2.1.1 documentation

こんな感じのログ保存用のモデルを作っておきます。

from django.db import models


class ChatLog(models.Model):

    room_name = models.CharField(max_length=100)
    user_name = models.CharField(max_length=100)
    message = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return 'ChatLog({}:{})'.format(self.room_name, self.user_name)

そしてChatConsumer内でChatLogを保存するようにしていきます。

from channels.db import database_sync_to_async
from chat.models import ChatLog


class ChatConsumer(AsyncWebsocketConsumer):
:
:
    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']
        await self._save_message(message)
        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))
:
:
    @database_sync_to_async
    def _save_message(self, message):
        ChatLog.objects.create(
            room_name=self.room_name,
            message=message,
            user_name=self.user.username,
        )

_save_messageというメソッド内でChatLogを作成しています。そしてAsyncにする必要があるので@database_sync_to_asyncアノテーションを付与しています。これでAsync実装のConsumer内でもDjangoのORMが使用できるようになります。なお、アノテーションにせず以下のような使い方でも同じです。

await database_sync_to_async(self._save_message)(message)

メッセージを受け取った際に呼び出されるchat_message内で_save_messageを実行していますが、非同期処理に変換されていますのでChannelLayerの処理と同じようにawaitを付与して実行します。

一応かるく動かしてチャットしてみると、以下のようにモデルインスタンスが生成されています。

>>> from chat.models import ChatLog
>>> ChatLog.objects.all()
<QuerySet [<ChatLog: ChatLog(lobby:aaa)>, <ChatLog: ChatLog(lobby:aaaaa)>]>

まとめ

これで一通りコードとしては完成しました。しかしまだrunserverで起動した組み込みサーバで動作確認をしています。次回はnginx + daphneを使ってWebSocketが動くようにしていきます。