金子邦彦研究室プログラミングPythonPython で,ファイル更新を監視

Python で,ファイル更新を監視

目次

  1. 前準備
  2. watchdog のインストール手順
  3. ソースコード

前準備

Python の準備(Windows,Ubuntu 上)

サイト内の関連ページ

関連する外部ページ

Python の公式ページ: https://www.python.org/

Python のまとめ: 別ページ »にまとめ

Python の公式ページ: https://www.python.org/

watchdog のインストール手順

  1. Windows では,コマンドプロンプト管理者として実行する.

    [image]
  2. 次のコマンドを実行

    Windows では「python」,Ubuntu では「sudo python3」.

    python -m pip install -U watchdog
    

    [image]

ソースコード

# -*- coding: utf-8 -*-
# 事前準備
#   pip install -U watchdog
# 使い方
#    引数でディレクトリを指定.引数がないときは,
#   このプログラムが置かれているディレクトリ(「os.path.abspath(os.path.dirname(__file__))」で取得)を使用
#   そのディレクトリの中の
#   ファイル名 log.txt (ファイル名は FILENAME で設定) の更新を監視する
#    プログラムを止めるには CTRL + C (同時押し)
# 参考Webページ
#    https://pythonhosted.org/watchdog/quickstart.html#a-simple-example

import sys
import time
import os
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# 監視するファイルのファイル名
FILENAME = 'log.txt'

class ChangeHandler(FileSystemEventHandler):
    def __init__(self):
        with open(FILENAME, 'rb') as f:
            # 最後のイベント時でのファイルの md5 ハッシュ値を入れておく
            self.oldmd5 = hashlib.md5(f.read()).hexdigest()
            # 最後のイベント時でのファイルの pos 値を入れておく
            self.pos = f.tell()

    def on_modified(self, event):
        filename = os.path.basename(event.src_path)
        if event.is_directory:
            return
        if filename in FILENAME:
            # ファイルの中身の変更を確認するために md5 ハッシュを使う
            with open(filename, 'rb') as f:
                mymd5 = hashlib.md5(f.read()).hexdigest()
            if self.oldmd5 != mymd5:
                self.oldmd5 = mymd5
                print('%s' % event.src_path)
            # ファイルに「追記」しか行わないものとする
            with open(filename, 'r') as f:
                f.seek(self.pos)
                dat = f.read()
                print(dat)
                self.pos = f.tell()

if __name__ == '__main__':
    # ディレクトリの取得と,カレントディレクトリの変更
    if len(sys.argv) > 1:
        path = sys.argv[1]
        os.chdir(path)
    else:
        path = os.path.abspath(os.path.dirname(__file__))

    event_handler = ChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()