Ultralytics ReID統合版によるマルチオブジェクトトラッキング(ソースコードと実行結果)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール(Windows 上) [クリックして展開]

以下のいずれかの方法で Python 3.12 をインストールする。Python がインストール済みの場合、この手順は不要である。

方法1:winget によるインストール

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

winget install --scope machine --id Python.Python.3.12 -e --silent --disable-interactivity --force --accept-source-agreements --accept-package-agreements --override "/quiet InstallAllUsers=1 PrependPath=1 Include_pip=1 Include_test=0 Include_launcher=1 InstallLauncherAllUsers=1"

--scope machine を指定することで、システム全体(全ユーザー向け)にインストールされる。このオプションの実行には管理者権限が必要である。インストール完了後、コマンドプロンプトを再起動すると PATH が自動的に設定される。

方法2:インストーラーによるインストール

  1. Python 公式サイト(https://www.python.org/downloads/)にアクセスし、「Download Python 3.x.x」ボタンから Windows 用インストーラーをダウンロードする。
  2. ダウンロードしたインストーラーを実行する。
  3. 初期画面の下部に表示される「Add python.exe to PATH」に必ずチェックを入れてから「Customize installation」を選択する。このチェックを入れ忘れると、コマンドプロンプトから python コマンドを実行できない。
  4. 「Install Python 3.xx for all users」にチェックを入れ、「Install」をクリックする。

インストールの確認

コマンドプロンプトで以下を実行する。

python --version

バージョン番号(例:Python 3.12.x)が表示されればインストール成功である。「'python' は、内部コマンドまたは外部コマンドとして認識されていません。」と表示される場合は、インストールが正常に完了していない。

AIエディタ Windsurf のインストール(Windows 上) [クリックして展開]

Pythonプログラムの編集・実行には、AIエディタの利用を推奨する。ここでは、Windsurfのインストールを説明する。Windsurf がインストール済みの場合、この手順は不要である。

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

winget install --scope machine --id Codeium.Windsurf -e --silent --disable-interactivity --force --accept-source-agreements --accept-package-agreements --custom "/SP- /SUPPRESSMSGBOXES /NORESTART /CLOSEAPPLICATIONS /DIR=""C:\Program Files\Windsurf"" /MERGETASKS=!runcode,addtopath,associatewithfiles,!desktopicon"
powershell -Command "$env:Path=[System.Environment]::GetEnvironmentVariable('Path','Machine')+';'+[System.Environment]::GetEnvironmentVariable('Path','User'); windsurf --install-extension MS-CEINTL.vscode-language-pack-ja --force; windsurf --install-extension ms-python.python --force; windsurf --install-extension Codeium.windsurfPyright --force"

--scope machine を指定することで、システム全体(全ユーザー向け)にインストールされる。このオプションの実行には管理者権限が必要である。インストール完了後、コマンドプロンプトを再起動すると PATH が自動的に設定される。

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリをシステム領域にインストール

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

REM PyTorch をインストール(GPU対応版)
set "CUDA_TAG=cu126"
set "PYTHON_PATH=C:\Program Files\Python312"
"%PYTHON_PATH%\Scripts\pip" install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/%CUDA_TAG%
pip install ultralytics opencv-python pillow lap

Ultralytics ReID統合版によるマルチオブジェクトトラッキング

概要

ソースコード

# -*- coding: utf-8 -*-
"""
Ultralytics ReID統合版によるマルチオブジェクトトラッキング
"""

import cv2
import tkinter as tk
from tkinter import filedialog
import os
import numpy as np
from ultralytics import YOLO
from PIL import Image, ImageDraw, ImageFont
import urllib.request
import time
import yaml

# 設定値
CONFIDENCE_THRESHOLD = 0.3
REID_CONFIDENCE_THRESHOLD = 0.5
TRACK_HIGH_THRESH = 0.25
TRACK_LOW_THRESH = 0.1
NEW_TRACK_THRESH = 0.25
TRACK_BUFFER = 30
MATCH_THRESH = 0.8
APPEARANCE_THRESH = 0.25
PROXIMITY_THRESH = 0.5
DISPLAY_INTERVAL = 1.0
FONT_SIZE = 20
FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'

# COCO 80クラス色パレット
CLASS_COLORS = [
    (255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255),
    (0, 255, 255), (128, 0, 0), (0, 128, 0), (0, 0, 128), (128, 128, 0),
    (128, 0, 128), (0, 128, 128), (192, 192, 192), (128, 128, 128), (255, 165, 0),
    (255, 20, 147), (0, 191, 255), (255, 69, 0), (255, 140, 0), (173, 255, 47),
    (240, 230, 140), (220, 20, 60), (0, 100, 0), (255, 105, 180), (75, 0, 130),
    (255, 215, 0), (186, 85, 211), (147, 112, 219), (218, 112, 214), (255, 182, 193),
    (176, 196, 222), (255, 160, 122), (205, 92, 92), (240, 128, 128), (221, 160, 221),
    (255, 228, 181), (255, 222, 173), (245, 222, 179), (222, 184, 135), (210, 180, 140),
    (188, 143, 143), (105, 105, 105), (119, 136, 153), (112, 128, 144), (47, 79, 79),
    (85, 107, 47), (154, 205, 50), (127, 255, 0), (255, 127, 80), (255, 99, 71),
    (255, 215, 0), (255, 20, 147), (255, 69, 0), (255, 140, 0), (255, 165, 0),
    (255, 192, 203), (160, 82, 45), (205, 133, 63), (72, 61, 139), (106, 90, 205),
    (123, 104, 238), (72, 209, 204), (199, 21, 133), (25, 25, 112), (255, 0, 255),
    (255, 20, 147), (138, 43, 226), (30, 144, 255), (255, 105, 180), (255, 69, 0),
    (255, 140, 0), (70, 130, 180), (176, 224, 230), (139, 69, 19), (160, 82, 45),
    (210, 105, 30), (205, 92, 92), (184, 134, 11), (218, 165, 32), (238, 203, 173)
]

def get_class_color(class_id):
    if class_id < len(CLASS_COLORS):
        return CLASS_COLORS[class_id]
    else:
        np.random.seed(class_id)
        return tuple(np.random.randint(0, 255, 3).tolist())

def create_botsort_config():
    return {
        'tracker_type': 'botsort',
        'track_high_thresh': TRACK_HIGH_THRESH,
        'track_low_thresh': TRACK_LOW_THRESH,
        'new_track_thresh': NEW_TRACK_THRESH,
        'track_buffer': TRACK_BUFFER,
        'match_thresh': MATCH_THRESH,
        'fuse_score': True,
        'gmc_method': 'sparseOptFlow',
        'with_reid': True,
        'model': 'auto',
        'proximity_thresh': PROXIMITY_THRESH,
        'appearance_thresh': APPEARANCE_THRESH
    }

def save_tracker_config(config):
    config_path = "botsort_reid.yaml"
    with open(config_path, 'w', encoding='utf-8') as f:
        yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
    return config_path

class MOTMetrics:
    def __init__(self):
        self.total_detections = 0
        self.total_tracks = 0
        self.active_tracks = set()
        self.track_history = {}
        self.id_switches = 0
        self.processing_times = []
        self.previous_tracks = {}
        self.frame_count = 0

    def update(self, detections_data, processing_time):
        self.frame_count += 1
        self.processing_times.append(processing_time)

        current_tracks = {}
        detection_count = len(detections_data)
        track_count = 0

        for detection in detections_data:
            self.total_detections += 1

            if detection['tracker_id'] is not None:
                track_id = detection['tracker_id']
                track_count += 1
                current_tracks[track_id] = {
                    'center': detection['center'],
                    'confidence': detection['confidence']
                }

                if track_id not in self.track_history:
                    self.track_history[track_id] = []
                self.track_history[track_id].append(self.frame_count)
                self.active_tracks.add(track_id)

        self.total_tracks += track_count
        self._detect_id_switches(current_tracks)
        self.previous_tracks = current_tracks.copy()

    def _detect_id_switches(self, current_tracks):
        if not self.previous_tracks or not current_tracks:
            return

        for prev_id, prev_data in self.previous_tracks.items():
            prev_center = prev_data['center']
            min_distance = float('inf')
            closest_id = None

            for curr_id, curr_data in current_tracks.items():
                curr_center = curr_data['center']
                distance = np.sqrt((prev_center[0] - curr_center[0])**2 +
                                 (prev_center[1] - curr_center[1])**2)

                if distance < min_distance:
                    min_distance = distance
                    closest_id = curr_id

            if closest_id and min_distance < 50 and closest_id != prev_id:
                if prev_id not in current_tracks:
                    self.id_switches += 1

    def calculate_mot_metrics(self):
        if self.total_detections == 0:
            return {
                'MOTA': 0.0, 'MOTP': 0.0, 'IDP': 0.0, 'IDR': 0.0, 'IDF1': 0.0,
                'ID_Switches': 0, 'False_Negatives': 0, 'Total_Tracks': 0,
                'Avg_Processing_Time': 0
            }

        fn = self.total_detections - self.total_tracks
        fp = max(0, self.total_tracks - self.total_detections)
        ids = self.id_switches

        mota = max(0.0, 1.0 - (fn + fp + ids) / max(1, self.total_detections))
        motp = self.total_tracks / max(1, self.total_detections)
        idp = max(0.0, (self.total_tracks - ids) / max(1, self.total_tracks))
        idr = max(0.0, (self.total_tracks - ids) / max(1, self.total_detections))
        idf1 = 2 * (idp * idr) / max(0.001, idp + idr)

        return {
            'MOTA': mota * 100, 'MOTP': motp * 100, 'IDP': idp * 100,
            'IDR': idr * 100, 'IDF1': idf1 * 100, 'ID_Switches': ids,
            'False_Negatives': fn, 'Total_Tracks': len(self.active_tracks),
            'Avg_Processing_Time': np.mean(self.processing_times) if self.processing_times else 0
        }

    def get_stats_text(self):
        if self.frame_count == 0:
            return "MOT統計: 初期化中"
        metrics = self.calculate_mot_metrics()
        return (f"MOT統計 - MOTA: {metrics['MOTA']:.1f}% | "
                f"IDF1: {metrics['IDF1']:.1f}% | "
                f"ID切替: {metrics['ID_Switches']} | "
                f"アクティブ追跡: {metrics['Total_Tracks']}")

def process_frame(frame, model, tracker_config_path, font, log, frame_count, last_print_time, mot_metrics):
    start_time = time.time()

    try:
        results = model.track(
            frame, persist=True, conf=CONFIDENCE_THRESHOLD,
            tracker=tracker_config_path, verbose=False
        )

        result = results[0]
        detections_data = []
        detection_count = 0
        tracked_count = 0

        if result.boxes is not None:
            boxes = result.boxes
            detection_count = len(boxes.xyxy)
            has_tracker_ids = hasattr(boxes, 'id') and boxes.id is not None

            for i in range(detection_count):
                x1, y1, x2, y2 = boxes.xyxy[i].cpu().numpy()
                center_x = (x1 + x2) / 2
                center_y = (y1 + y2) / 2
                confidence = boxes.conf[i].cpu().numpy() if boxes.conf is not None else 0.0

                class_name = "object"
                class_id = 0
                if boxes.cls is not None and hasattr(result, 'names'):
                    class_id = int(boxes.cls[i].cpu().numpy())
                    class_name = result.names.get(class_id, "object")

                if has_tracker_ids and i < len(boxes.id):
                    tracker_id = int(boxes.id[i].cpu().numpy())
                    tracked_count += 1
                else:
                    tracker_id = None

                reid_applied = confidence > REID_CONFIDENCE_THRESHOLD

                detections_data.append({
                    'bbox': (int(x1), int(y1), int(x2), int(y2)),
                    'center': (center_x, center_y),
                    'confidence': confidence,
                    'class_name': class_name,
                    'class_id': class_id,
                    'tracker_id': tracker_id,
                    'reid_applied': reid_applied
                })

        processing_time = time.time() - start_time
        mot_metrics.update(detections_data, processing_time)

        # フレーム描画
        ann_frame = frame.copy()

        if detections_data:
            current_time = time.strftime("%Y-%m-%d %H:%M:%S")

            # まず OpenCV で四角を描画
            for detection in detections_data:
                x1, y1, x2, y2 = detection['bbox']
                class_id = detection['class_id']
                bbox_color = get_class_color(class_id)

                # バウンディングボックス描画(クラス別色)
                cv2.rectangle(ann_frame, (x1, y1), (x2, y2), bbox_color, 2)

            # 次に PIL でテキストを描画
            img_pil = Image.fromarray(cv2.cvtColor(ann_frame, cv2.COLOR_BGR2RGB))
            draw = ImageDraw.Draw(img_pil)

            for detection in detections_data:
                x1, y1, x2, y2 = detection['bbox']
                class_id = detection['class_id']
                class_name = detection['class_name']
                bbox_color = get_class_color(class_id)

                if detection['tracker_id'] is not None:
                    main_text = f"ID:{detection['tracker_id']} {class_name}"
                    reid_status = "ReID" if detection['reid_applied'] else "基本"
                    status_text = f"({reid_status})"
                else:
                    main_text = f"{class_name} (検出)"
                    status_text = ""

                text_bg_color = tuple(int(c * 0.3) for c in bbox_color)
                text_color = (255, 255, 255)

                try:
                    bbox = draw.textbbox((0, 0), main_text, font=font)
                    text_width = bbox[2] - bbox[0]
                except AttributeError:
                    text_size = draw.textsize(main_text, font=font)
                    text_width = text_size[0]

                bg_height = 50 if status_text else 30

                # テキスト背景をPILで描画
                draw.rectangle([x1, y1-bg_height, x1 + text_width + 10, y1], fill=text_bg_color)
                draw.text((x1 + 5, y1-45), main_text, font=font, fill=text_color)
                if status_text:
                    draw.text((x1 + 5, y1-25), status_text, font=font, fill=text_color)

                log_entry = (f"{current_time},ID:{detection['tracker_id']},クラス:{detection['class_name']},"
                           f"信頼度:{detection['confidence']:.3f},ReID:{detection['reid_applied']},"
                           f"座標:({detection['center'][0]:.1f},{detection['center'][1]:.1f})")
                log.append(log_entry)

            ann_frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

        # 統計情報表示
        img_pil_status = Image.fromarray(cv2.cvtColor(ann_frame, cv2.COLOR_BGR2RGB))
        draw_status = ImageDraw.Draw(img_pil_status)

        status_text = f"フレーム: {frame_count} | 検出: {detection_count} | 追跡: {tracked_count}"
        draw_status.text((10, ann_frame.shape[0] - 85), status_text, font=font, fill=(255, 255, 255))

        mot_stats_text = mot_metrics.get_stats_text()
        draw_status.text((10, ann_frame.shape[0] - 60), mot_stats_text, font=font, fill=(0, 255, 255))

        mot_detailed = mot_metrics.calculate_mot_metrics()
        detailed_text = f"MOTA: {mot_detailed['MOTA']:.1f}% | IDF1: {mot_detailed['IDF1']:.1f}% | ID切替: {mot_detailed['ID_Switches']}"
        draw_status.text((10, ann_frame.shape[0] - 35), detailed_text, font=font, fill=(255, 165, 0))

        ann_frame = cv2.cvtColor(np.array(img_pil_status), cv2.COLOR_RGB2BGR)

        # 定期出力
        current_time_sec = time.time()
        if current_time_sec - last_print_time >= DISPLAY_INTERVAL:
            if detections_data:
                print(f"\n[フレーム {frame_count}] 検出: {detection_count} | 追跡: {tracked_count}")
                print(f"MOTA: {mot_detailed['MOTA']:.2f}% | IDF1: {mot_detailed['IDF1']:.2f}% | ID切替: {mot_detailed['ID_Switches']}")
            last_print_time = current_time_sec

        return ann_frame, last_print_time

    except Exception as e:
        print(f"トラッキングエラー: {e}")
        return frame, last_print_time

def main():
    print("=" * 60)
    print("マルチオブジェクトトラッキングシステム")
    print("=" * 60)

    print("\n【入力ソース選択】")
    print("0: 動画ファイル")
    print("1: カメラ")
    print("2: サンプル動画")

    choice = input("\n選択してください: ")
    temp_file = None
    log = []

    print("\nシステム初期化中...")

    try:
        model = YOLO("yolo11n.pt")
        print("✓ YOLO11モデル読み込み完了")

        tracker_config = create_botsort_config()
        tracker_config_path = save_tracker_config(tracker_config)
        print("✓ トラッカー設定完了")

        font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
        print("✓ フォント設定完了")

        mot_metrics = MOTMetrics()
        print("✓ MOT評価指標初期化完了")

        print("初期化完了!")

    except Exception as e:
        print(f"初期化エラー: {e}")
        return

    # 入力ソース設定
    if choice == '0':
        print("動画ファイルを選択してください...")
        root = tk.Tk()
        root.withdraw()
        path = filedialog.askopenfilename(
            title="動画ファイルを選択",
            filetypes=[("動画ファイル", "*.mp4 *.avi *.mov *.mkv"), ("全ファイル", "*.*")]
        )
        root.destroy()
        if not path:
            print("ファイルが選択されませんでした。")
            return
        cap = cv2.VideoCapture(path)
    elif choice == '1':
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
        if not cap.isOpened():
            cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            print("カメラを開けませんでした。")
            return
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
        if not cap.isOpened():
            print("カメラを開けませんでした。")
            return
    elif choice == '2':
        url = "https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi"
        filename = "vtest.avi"
        print("サンプル動画をダウンロード中...")
        try:
            urllib.request.urlretrieve(url, filename)
            temp_file = filename
            cap = cv2.VideoCapture(filename)
            print("サンプル動画準備完了")
        except Exception as e:
            print(f"サンプル動画ダウンロードエラー: {e}")
            return
    else:
        print("無効な選択です。")
        return

    print("\nトラッキング開始 - 'q'で終了")

    frame_count = 0
    last_print_time = time.time()

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("動画の最後に到達しました。")
                break

            frame_count += 1
            processed_frame, last_print_time = process_frame(
                frame, model, tracker_config_path, font, log, frame_count,
                last_print_time, mot_metrics
            )

            cv2.imshow('Multi-Object Tracking', processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("\n終了が要求されました。")
                break

    except KeyboardInterrupt:
        print("\nキーボード割り込みにより終了します。")
    except Exception as e:
        print(f"処理中にエラー: {e}")
    finally:
        cap.release()
        cv2.destroyAllWindows()
        if temp_file and os.path.exists(temp_file):
            os.remove(temp_file)
        if os.path.exists(tracker_config_path):
            os.remove(tracker_config_path)

    # 結果保存
    print("\n処理結果を保存中...")

    try:
        final_mot_metrics = mot_metrics.calculate_mot_metrics()

        with open('tracking_result.txt', 'w', encoding='utf-8') as f:
            f.write("トラッキング結果\n")
            f.write("=" * 30 + "\n")
            f.write(f"MOTA: {final_mot_metrics['MOTA']:.2f}%\n")
            f.write(f"IDF1: {final_mot_metrics['IDF1']:.2f}%\n")
            f.write(f"ID切替数: {final_mot_metrics['ID_Switches']}\n")
            f.write(f"総フレーム数: {frame_count}\n")
            f.write(f"総検出数: {mot_metrics.total_detections}\n")
            f.write(f"総追跡数: {mot_metrics.total_tracks}\n")
            f.write("-" * 30 + "\n")
            for log_entry in log:
                f.write(log_entry + "\n")

        print("結果をtracking_result.txtに保存しました")
        print(f"MOTA: {final_mot_metrics['MOTA']:.2f}%")
        print(f"IDF1: {final_mot_metrics['IDF1']:.2f}%")
        print("プログラム終了")

    except Exception as e:
        print(f"結果保存エラー: {e}")

if __name__ == "__main__":
    main()