[Dd]enzow(ill)? with DB and Python

DBとか資格とかPythonとかの話をつらつらと

PythonのUnitTestでメール送信を伴う場合の対応を検討してみた

1年くらい前に自分でかき捨てたコードを久々にメンテすることになったので見ていたらテストコードがなくてさっぱりでした。さてPythonでのテストの場合標準で用意されているunittestを使うことがあると思います。しかし、メール送信を伴う処理の場合、テスト方法に悩みます。そんな場合にどうしたかをまとめておきます。

何が悩むのか?

メール送信を行う処理の場合、テストのためにSMTPサーバが必要です。しかし、テスト用に用意するのは工数がかかりますし、テストを動かせる環境に制限がかかります。また普通のSMTPサーバを用いた場合は、実際に受信してのチェックが必要になり面倒です。

どうしたのか

Pythonでデバッグ用のSMTPサーバを用意し、setUp()で別プロセスとして起動するようにしました。

デバッグ用のSMTPサーバについては以下のコードを参考とさせていただきました。

偏った言語信者の垂れ流し:デバッグ用SMTPサーバ

このコードでは、SMTPサーバが受け取ったメールをファイルとしてローカルに書き出すため、送信した内容と書き出されたファイルの内容が一致するかをチェックするテストコードにしました。

テストコード

だいたいこんな感じになってます。mycode.send_mailがテスト対象のコードです。以下の流れでテストを実装しています。

  • 各テスト起動時にデバッグ用のSMTPサーバを起動
  • デバッグ用のSMTPサーバ宛にメールを送信
  • ファイルが指定ディレクトリに書き出される
  • emlファイルをパースして、件名や内容等をself.assertEqual等する
import unittest
import subprocess
import shutil
import os
from for_debug import MailParser
from mycode import send_mail

# debug SMTPサーバ
PORT = 8025
DUMMY_SMTP_SETTINGS = {
    "SERVER": "localhost",
    "PORT": PORT,
    "ACCOUNT": "",
    "PASSWORD": "",
}
MAIL_SAVE_FOLDER = "/tmp/test"

class Test3Utils(unittest.TestCase):
    def setUp(self):
        """
        各テスト実行時に、ディレクトリを再作成しデバッグ用のSMTPサーバを
        subProcessで起動する
        """
        try:
            shutil.rmtree(MAIL_SAVE_FOLDER)
            os.mkdir(MAIL_SAVE_FOLDER)
        except:
            pass
 
        self.dummy_server = subprocess.Popen(
            [
                "python",
                "debug_smtp_server.py",
                "--outdir",
                "{}".format(MAIL_SAVE_FOLDER)
            ],
            shell=False
        )

    def tearDown(self):
        """
        テスト終了時にデバッグ用のSMTPサーバをKILLする
        """
        self.dummy_server.terminate()
        self.dummy_server.communicate()

    def test_send_mail(self):
 
        # 送信用メソッド
        send_mail.send_normal_mail(
            smtp_settings=DUMMY_SMTP_SETTINGS,
            to_addr_list=["my@address.com"],
        )
        # 書き出し待ちでスリープいれる
        time.sleep(1)
        # 書き出されたメールファイルを取得する
        files = glob.glob(MAIL_SAVE_FOLDER+os.sep+"*eml")
        received_mail = MailParser(files[0], "")
        # メールファイルが1通だけか
        self.assertEqual(len(files), 1)
        # 期待したタイトルであるか
        self.assertEqual(received_mail.subject, "expected mail subject")
:

なお、OS上のemlファイルを解析しているコードは以下のMailParserクラスを実装して使っています。

# coding:utf-8
"""
emlファイルを元に扱いやすい様にデータを取得する
サンプル。

あくまでい最低限の実装のため考慮漏れなどがあるかも。。。
"""
import sys
import email
from email.header import decode_header

class MailParser(object):
    """
    メールファイルのパスを受け取り、それを解析するクラス
    """

    def __init__(self, mail_file_path):
        self.mail_file_path = mail_file_path
        # emlファイルからemail.message.Messageインスタンスの取得
        with open(mail_file_path, 'rb') as email_file:
            self.email_message = email.message_from_bytes(email_file.read())
        self.subject = None
        self.to_address = None
        self.cc_address = None
        self.from_address = None
        self.body = ""
        # 添付ファイル関連の情報
        # {name: file_name, data: data}
        self.attach_file_list = []
        # emlの解釈
        self._parse()

    def _parse(self):
        """
        メールファイルの解析
        __init__内で呼び出している
        """
        self.subject = self._get_decoded_header("Subject")
        self.to_address = self._get_decoded_header("To")
        self.cc_address = self._get_decoded_header("Cc")
        self.from_address = self._get_decoded_header("From")

        # メッセージ本文部分の処理
        for part in self.email_message.walk():
            # ContentTypeがmultipartの場合は実際のコンテンツはさらに
            # 中のpartにあるので読み飛ばす
            if part.get_content_maintype() == 'multipart':
                continue
            # ファイル名の取得
            attach_fname = part.get_filename()
            # ファイル名がない場合は本文のはず
            if not attach_fname:
                charset = str(part.get_content_charset())
                if charset:
                    self.body += part.get_payload(decode=True).decode(charset, errors="replace")
                else:
                    self.body += part.get_payload(decode=True)
            else:
                # ファイル名があるならそれは添付ファイルなので
                # データを取得する
                self.attach_file_list.append({
                    "name": attach_fname,
                    "data": part.get_payload(decode=True)
                })

    def _get_decoded_header(self, key_name):
        """
        ヘッダーオブジェクトからデコード済の結果を取得する
        """
        ret = ""

        # 該当項目がないkeyは空文字を戻す
        raw_obj = self.email_message.get(key_name)
        if raw_obj is None:
            return ""
        # デコードした結果をunicodeにする
        for fragment, encoding in decode_header(raw_obj):
            if not hasattr(fragment, "decode"):
                ret += fragment
                continue
            # encodeがなければとりあえずUTF-8でデコードする
            if encoding:
                ret += fragment.decode(encoding)
            else:
                ret += fragment.decode("UTF-8")
        return ret

まとめ

果たして受信待機が1秒でいいのかとか、もっと他の方法があるんじゃないかという気持ちもありますが、いまのところ順調に動いているので満足です。どなたかの参考になれば幸いです。