プログラムを中心とした個人的なメモ用のブログです。 タイトルは迷走中。
内容の保証はできませんのであしからずご了承ください。

2023/04/04

[Python] マルチプロセスにおけるロギング (QueueHandler, QueueListener)

update2023/04/05 event_note2023/04/04 0:58

SyslogHandler で UDP でログを記録していたのですが、マルチプロセスでは Socket 通信のところでエラーが発生しました。
マルチプロセスでのロギングでは、QueueHandlerQueueListner を使ってログを一箇所に集約して書き込むようにする必要があります。

環境

  • Python 3.8.10

基本的な書き方

とりあえずマルチプロセスは置いておいて、QueueHandlerQueueListener の基本的な使い方です。

import os
from multiprocessing import Queue
from logging import StreamHandler, handlers, basicConfig, getLogger, Formatter, DEBUG

# 標準出力の設定
stream_handler = StreamHandler()
stream_handler.setFormatter(Formatter('%(asctime)s [%(levelname)s] [%(name)s] %(message)s'))
# Syslog 出力の設定
syslog_handler = handlers.SysLogHandler(address=('localhost', 514))
syslog_handler.setFormatter(Formatter(f'{os.environ.get("HOSTNAME")} ris-main: %(message)s'))
# ファイル出力の設定
file_handler = handlers.TimedRotatingFileHandler('log/sample.log')
file_handler.setFormatter(Formatter(fmt='%(asctime)s.%(msecs)03d [%(levelname)s] [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
# キューハンドラの設定
que = Queue()
queue_handler = handlers.QueueHandler(que)
basicConfig(level=DEBUG, handlers=[queue_handler])

# リスナーの作成と開始
queue_listener = handlers.QueueListener(que, *[stream_handler, syslog_handler, file_handler])
queue_listener.start()

logger = getLogger(__name__)

logger.info('this is info')
logger.debug('this is debug')
logger.critical('this is critical')

queue_listener.stop()

logger を使って書いたログは QueueHandler によりキューに入れられます。
QueueListner はキューに溜まったログを取り出して、StreamHandler SyslogHandler TimedRotatingFileHandler で記録されます。

尚、上記は basicConfig を使って設定していますので、ライブラリなどのロギングにも影響します(対処方法は後述)。

マルチプロセスで試してみた

上記を踏まえ、プロセスを3つ立ち上げ、各プロセス100回ログを記録させてみたサンプルが以下です。

import os
from multiprocessing import Process, Queue
from logging import StreamHandler, handlers, basicConfig, getLogger, Formatter, DEBUG

def hoge(index):
    logger = getLogger(__name__)

    for count in range(100):
        logger.info(f'({index}) this is info')
        logger.debug(f'({index}) this is debug')
        logger.critical(f'({index}) this is critical')

# 標準出力の設定
stream_handler = StreamHandler()
stream_handler.setFormatter(Formatter('%(asctime)s [%(levelname)s] [%(name)s] %(message)s'))
# Syslog 出力の設定
syslog_handler = handlers.SysLogHandler(address=('localhost', 514))
syslog_handler.setFormatter(Formatter(f'{os.environ.get("HOSTNAME")} ris-main: %(message)s'))
# ファイル出力の設定
file_handler = handlers.TimedRotatingFileHandler('log/sample.log')
file_handler.setFormatter(Formatter(fmt='%(asctime)s.%(msecs)03d [%(levelname)s] [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
# キューハンドラの設定
que = Queue()
queue_handler = handlers.QueueHandler(que)
basicConfig(level=DEBUG, handlers=[queue_handler])

# リスナーの作成と開始
queue_listener = handlers.QueueListener(que, *[stream_handler, syslog_handler, file_handler])
queue_listener.start()

# プロセスの立ち上げ
processes = []
for i in range(3):
    p = Process(target=hoge, args=(i,))
    p.start()
    processes.append(p)

# プロセスの終了を待つ
for process in processes:
    try:
        process.join()
    except KeyboardInterrupt:
        process.join(1)

queue_listener.stop()

問題なくログが記録できていることが確認できました。

basicConfig を使わずに設定する

上述したように、basicConfig を使うとライブラリなどの全てのモジュールに影響するので、logger に対して設定を行うようにします。
ただ、そうなると都度設定を行う必要が出てくるので、以下のページを参考に共通の関数を作成しました。

ただし、QueueHandler を使う場合、キューを渡してやる必要があるので、それはグローバル変数として用意することにしました。
ここらへん、もっと良い実装方法はないかなぁとは思いますが・・・。

以下、サンプルです。

# mylogger.py
import multiprocessing
import logging
import logging.handlers

log_queue = multiprocessing.Queue()

def getLogger(modname) -> logging.Logger:
    """ 共通のロガーを作成するための関数
        logging.getLogger と簡単に差し替えられるように同じ関数名としておく
    """

    # キューハンドラの設定
    queue_handler = logging.handlers.QueueHandler(log_queue)
    logger = logging.getLogger(modname)
    logger.addHandler(queue_handler)
    logger.setLevel(logging.DEBUG)
    return logger
# main.py
import os
from logging import handlers, StreamHandler, Formatter, DEBUG, INFO
from mylogger import getLogger, log_queue

# 標準出力の設定
stream_handler = StreamHandler()
stream_handler.setFormatter(Formatter('%(asctime)s [%(levelname)s] [%(name)s] %(message)s'))
stream_handler.setLevel(DEBUG)
# Syslog 出力の設定
syslog_handler = handlers.SysLogHandler(address=('localhost', 514))
syslog_handler.setFormatter(Formatter(f'{os.environ.get("HOSTNAME")} ris-main: %(message)s'))
syslog_handler.setLevel(INFO)
# ファイル出力の設定
file_handler = handlers.TimedRotatingFileHandler('log/sample.log')
file_handler.setFormatter(Formatter(fmt='%(asctime)s.%(msecs)03d [%(levelname)s] [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
file_handler.setLevel(INFO)

# リスナーの作成と開始
queue_listener = handlers.QueueListener(
    log_queue,
    *[stream_handler, syslog_handler, file_handler],
    respect_handler_level=True)
queue_listener.start()

logger = getLogger(__name__)

logger.info('this is info')
logger.debug('this is debug')
logger.critical('this is critical')

queue_listener.stop()

また、ここでは標準出力とそれ以外で、出力するログレベルを変えています。
この場合、QueueHandlerrespect_handler_levelTrue にして、各ハンドラー側のレベルを優先するように設定してやる必要があります。