MediaPipeによる人物検出(ソースコードと実行結果)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install mediapipe opencv-python numpy pillow

MediaPipe人物検出プログラム

概要

このプログラムは、動画フレームから人物を検出し、その位置をバウンディングボックスで特定する。

ソースコード


# プログラム名: MediaPipe人物検出プログラム
# 特徴技術名: MediaPipe
# 出典: MediaPipe Tasks - Google
# 特徴機能: MediaPipe Tasks Object Detectorによる2Dオブジェクト検出。リアルタイムで動作する軽量なオブジェクト検出で人物検出に特化
# 学習済みモデル: EfficientDet-Lite0事前学習済みモデル(COCO 80クラスからPersonクラスのみ抽出)
# 方式設計:
#   - 関連利用技術:
#     - MediaPipe: Googleが開発したマルチプラットフォーム機械学習ソリューション
#     - OpenCV: 画像処理、カメラ制御、描画処理、動画入出力管理
#   - 入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)、出力: OpenCV画面でリアルタイム表示(検出した人物をバウンディングボックスで表示)、1秒間隔でprint()による処理結果表示、プログラム終了時にresult.txtファイルに保存
#   - 処理手順: 1.フレーム取得、2.MediaPipe推論実行、3.Personクラスのフィルタリング、4.信頼度閾値による選別、5.バウンディングボックス描画
#   - 前処理、後処理: 前処理:MediaPipe内部で自動実行。後処理:信頼度による閾値フィルタリングのみ実施
#   - 追加処理: 検出結果の信頼度降順ソートにより重要な検出を優先表示
#   - 調整を必要とする設定値: CONF_THRESH(人物検出信頼度閾値、デフォルト0.5)- 値を上げると誤検出が減少するが検出漏れが増加
# 将来方策: CONF_THRESHの動的調整機能。フレーム毎の検出数を監視し、検出数が閾値を超えた場合は信頼度を上げ、検出数が少ない場合は下げる適応的制御の実装
# その他の重要事項: Windows環境専用設計、初回実行時は学習済みモデルの自動ダウンロード
# 前準備:
#   - pip install mediapipe opencv-python numpy pillow

import cv2
import tkinter as tk
from tkinter import filedialog
import os
import numpy as np
import warnings
import time
import urllib.request
from PIL import Image, ImageDraw, ImageFont
import mediapipe as mp

warnings.filterwarnings('ignore')

# ===== 設定・定数管理 =====
# MediaPipe設定
BaseOptions = mp.tasks.BaseOptions
ObjectDetector = mp.tasks.vision.ObjectDetector
ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions
VisionRunningMode = mp.tasks.vision.RunningMode

# モデル選択(0, 2, ssdから選択可能)
MODEL_SIZE = '0'  # 使用するモデルサイズ(0=EfficientDet-Lite0, 2=EfficientDet-Lite2, ssd=SSD MobileNetV2)

# モデル情報
MODEL_INFO = {
    '0': {
        'name': 'EfficientDet-Lite0',
        'desc': 'バランス型(推奨)',
        'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite',
        'file': 'efficientdet_lite0.tflite'
    },
    '2': {
        'name': 'EfficientDet-Lite2',
        'desc': '高精度',
        'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite2/int8/1/efficientdet_lite2.tflite',
        'file': 'efficientdet_lite2.tflite'
    },
    'ssd': {
        'name': 'SSD MobileNetV2',
        'desc': '最軽量',
        'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/ssd_mobilenet_v2/float32/1/ssd_mobilenet_v2.tflite',
        'file': 'ssd_mobilenet_v2.tflite'
    }
}

MODEL_URL = MODEL_INFO[MODEL_SIZE]['url']
MODEL_PATH = MODEL_INFO[MODEL_SIZE]['file']

SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
SAMPLE_FILE = 'vtest.avi'
RESULT_FILE = 'result.txt'

# カメラ設定
WINDOW_WIDTH = 1280  # カメラ解像度幅
WINDOW_HEIGHT = 720  # カメラ解像度高さ
FPS = 30  # フレームレート

# 検出パラメータ(調整可能)
CONF_THRESH = 0.5  # 人物検出信頼度閾値(0.0-1.0)
PERSON_CLASS_NAME = 'person'  # COCOデータセットにおけるPersonクラスの名前

# 表示設定
PRINT_INTERVAL = 1.0  # 結果出力間隔(秒)

# 人物の表示色(BGR形式)
PERSON_COLOR = (0, 255, 0)  # 緑(バウンディングボックス用)


# プログラム概要表示
print('=== MediaPipe人物検出プログラム ===')
print('概要: リアルタイムで人物を検出し、バウンディングボックスで表示します')
print('機能: MediaPipeによる人物検出(COCOデータセットのPersonクラス)')
print('操作: qキーで終了')
print('出力: 1秒間隔での処理結果表示、終了時にresult.txt保存')
print()

# システム初期化
print('システム初期化中...')
start_time = time.time()

# モデルダウンロード
if not os.path.exists(MODEL_PATH):
    print(f'{MODEL_INFO[MODEL_SIZE]["name"]}モデルをダウンロード中...')
    try:
        urllib.request.urlretrieve(MODEL_URL, MODEL_PATH)
        print('モデルのダウンロードが完了しました')
    except Exception as e:
        print(f'モデルのダウンロードに失敗しました: {e}')
        exit()

# MediaPipeモデル初期化
detector = None
try:
    print(f'MediaPipe Object Detector {MODEL_INFO[MODEL_SIZE]["name"]}モデルを初期化中...')
    options = ObjectDetectorOptions(
        base_options=BaseOptions(model_asset_path=MODEL_PATH),
        running_mode=VisionRunningMode.VIDEO,
        score_threshold=CONF_THRESH,
        max_results=-1
    )
    detector = ObjectDetector.create_from_options(options)
    print(f'MediaPipe Object Detector {MODEL_INFO[MODEL_SIZE]["name"]}モデルの初期化が完了しました')
    print(f'モデル: {MODEL_INFO[MODEL_SIZE]["name"]} ({MODEL_INFO[MODEL_SIZE]["desc"]})')
except Exception as e:
    print(f'MediaPipe Object Detector {MODEL_INFO[MODEL_SIZE]["name"]}モデルの初期化に失敗しました')
    print(f'エラー: {e}')
    exit()

print('初期化完了')
print()

# グローバル変数
frame_count = 0
last_print_time = time.time()
results_log = []


def video_processing(frame, timestamp_us):
    """フレーム処理メイン関数"""
    global frame_count, last_print_time, results_log

    frame_count += 1

    # MediaPipeで検出実行
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    detection_result = detector.detect_for_video(mp_image, timestamp_us)

    persons = []
    total_confidence = 0

    if detection_result.detections:
        for detection in detection_result.detections:
            # カテゴリーからpersonクラスを探す
            for category in detection.categories:
                if category.category_name == PERSON_CLASS_NAME:
                    # バウンディングボックスの座標を取得
                    bbox = detection.bounding_box
                    x1 = bbox.origin_x
                    y1 = bbox.origin_y
                    x2 = x1 + bbox.width
                    y2 = y1 + bbox.height

                    person_data = {
                        'box': (x1, y1, x2, y2),
                        'detection_conf': category.score
                    }
                    persons.append(person_data)
                    total_confidence += category.score
                    break

    # 信頼度でソート(降順)
    persons.sort(key=lambda x: x['detection_conf'], reverse=True)

    # 人影・人の気配判定ロジック
    person_count = len(persons)
    if person_count == 0:
        presence_level = '気配なし'
    elif person_count == 1 and total_confidence < 0.7:
        presence_level = '人影の可能性'
    elif person_count >= 1 and total_confidence >= 0.7:
        presence_level = '人の存在確認'

    # 1秒間隔での出力
    current_time = time.time()
    if current_time - last_print_time >= PRINT_INTERVAL:
        output = f'フレーム {frame_count}: {len(persons)}人検出'
        for i, p in enumerate(persons):
            output += f' | 人物{i+1}: 信頼度{p["detection_conf"]:.0%}'
        output += f' | 判定: {presence_level}'
        print(output)

        results_log.append(output)
        last_print_time = current_time

    # 描画処理
    for i, p in enumerate(persons):
        x1, y1, x2, y2 = p['box']
        color = PERSON_COLOR

        # バウンディングボックス
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

        # ラベル表示
        label1 = f'Person {i+1}'
        label2 = f'Conf:{p["detection_conf"]:.1%}'

        cv2.putText(frame, label1, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, label2, (x1, y2+15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

    # システム情報表示
    info1 = f'MediaPipe | Frame: {frame_count}'
    info2 = 'Press: q=Quit'

    cv2.putText(frame, info1, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    cv2.putText(frame, info2, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)

    # 日本語表示のためPillowで描画
    img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img_pil)
    font = ImageFont.truetype("C:/Windows/Fonts/msgothic.ttc", 30)

    # 判定結果を日本語で表示
    draw.text((10, 90), presence_level, font=font, fill=(0, 255, 0))
    draw.text((10, 130), f'人数: {person_count}', font=font, fill=(255, 0, 0))

    # RGB→BGR変換
    frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

    return frame


# 入力選択
print('0: 動画ファイル')
print('1: カメラ')
print('2: サンプル動画')

choice = input('選択: ')
temp_file = None

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        exit()
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        print(f'動画ファイルを開けませんでした: {path}')
        exit()
elif choice == '1':
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, WINDOW_WIDTH)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, WINDOW_HEIGHT)
    cap.set(cv2.CAP_PROP_FPS, FPS)
    if not cap.isOpened():
        print('カメラを開けませんでした')
        exit()
elif choice == '2':
    # サンプル動画ダウンロード・処理
    try:
        urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
        temp_file = SAMPLE_FILE
        cap = cv2.VideoCapture(SAMPLE_FILE)
        if not cap.isOpened():
            print(f'サンプル動画を開けませんでした: {SAMPLE_FILE}')
            exit()
        print('サンプル動画のダウンロードが完了しました')
    except Exception as e:
        print(f'動画のダウンロードに失敗しました: {SAMPLE_URL}')
        print(f'エラー: {e}')
        exit()
else:
    print('無効な選択です')
    exit()

# 動画のFPSを取得
actual_fps = cap.get(cv2.CAP_PROP_FPS)
if actual_fps == 0 or actual_fps is None:
    actual_fps = FPS  # デフォルト値を使用

# 動画処理開始メッセージ
print('\n=== 動画処理開始 ===')
print('操作方法:')
print('  q キー: プログラム終了')
print()

# メイン処理
timestamp_us = 0
frame_duration_us = int(1000000 / actual_fps)  # マイクロ秒単位のフレーム間隔
try:
    while True:
        cap.grab()
        ret, frame = cap.retrieve()
        if not ret:
            break

        processed_frame = video_processing(frame, timestamp_us)
        cv2.imshow('MediaPipe Person Detection', processed_frame)

        # タイムスタンプを更新(マイクロ秒単位)
        timestamp_us += frame_duration_us

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cap.release()
    cv2.destroyAllWindows()

    # 結果保存
    if results_log:
        with open(RESULT_FILE, 'w', encoding='utf-8') as f:
            f.write('=== MediaPipe人物検出結果 ===\n')
            f.write(f'処理フレーム数: {frame_count}\n')
            f.write('\n')
            f.write('\n'.join(results_log))

        print(f'\n処理結果を{RESULT_FILE}に保存しました')

    if temp_file and os.path.exists(temp_file):
        os.remove(temp_file)

print('\n=== プログラム終了 ===')