YOLOv10による物体検出・ByteTrackによる追跡とTTAの機能付き(ソースコードと説明と利用ガイド)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install ultralytics opencv-python numpy pillow boxmot

YOLOv10による物体検出プログラム・ByteTrackによる追跡とTTAの機能付き

概要

このプログラムは、動画フレームから人物を検出し、その位置をバウンディングボックスで特定する。

主要技術

参考文献

ソースコード


# プログラム名: YOLOv10による物体検出プログラム・ByteTrackによる追跡とTTAの機能付き
# 特徴技術名: YOLOv10
# 出典: Wang, A., Chen, H., Liu, L., Chen, K., Lin, Z., Han, J., & Ding, G. (2024). YOLOv10: Real-Time End-to-End Object Detection. arXiv:2405.14458.
# 特徴機能: NMSフリー検出機能。Non-Maximum Suppressionを不要とする設計により、後処理なしで物体検出を実現。本プログラムではpersonクラスのみを検出。TTA(Test Time Augmentation)とByteTrack追跡機能を追加
# 学習済みモデル: YOLOv10 COCO事前学習済みモデル(personクラスのみ使用)
#   モデルサイズ選択可能(デフォルト: n):
#   n (nano): yolov10n.pt - 最小
#   s (small): yolov10s.pt - 小
#   m (medium): yolov10m.pt - 中
#   b (balanced): yolov10b.pt - 中上
#   l (large): yolov10l.pt - 大
#   x (extra large): yolov10x.pt - 最大
# 方式設計:
#   - 関連利用技術:
#     - PyTorch: 深層学習フレームワーク、CUDA対応
#     - OpenCV: 画像処理、カメラ制御、描画処理、動画入出力管理
#     - ByteTrack: カルマンフィルタとハンガリアンアルゴリズムによる物体追跡(boxmotパッケージ版)
#     - TTA (Test Time Augmentation): 複数の画像変換で推論し結果を統合
#   - 入力と出力:
#     入力: 動画(0:動画ファイル,1:カメラ,2:サンプル動画)
#     出力: 各フレームごとに全検出一覧をprint表示し、終了時にresult.txtに保存
#   - 処理手順: 1.フレーム取得、2.CLAHE前処理、3.TTA適用、4.YOLOv10推論、5.personクラス検出、6.ByteTrack追跡、7.信頼度閾値で選別、8.バウンディングボックス描画
#   - 前処理/後処理: 前処理はCLAHE(YUV色空間)とYOLO内部処理(640x640リサイズ、正規化)。後処理は信頼度閾値によるフィルタリングとByteTrack追跡による安定化
#   - 追加処理: デバイス自動選択(GPU/CPU)、日本語クラス名表示、TTA、ByteTrack追跡
#   - 調整可能値: CONF_THRESH(検出信頼度閾値、デフォルト0.5)、TTA_ENABLED(TTAの有効/無効、デフォルトTrue)、USE_TRACKER(トラッカーの使用有無、デフォルトTrue)
# 環境: Windows想定(PillowでMeiryoフォントを使用)
# 前準備:
#   - pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
#   - pip install ultralytics opencv-python numpy pillow boxmot
# 特徴技術および学習済モデルの利用制限: YOLOv10はAGPL-3.0ライセンス。商用利用時はUltralyticsへの連絡が必要。ByteTrackはMITライセンス。必ず利用者自身で利用制限を確認すること。

import os
import time
import urllib.request
import tkinter as tk
from tkinter import filedialog
from datetime import datetime
import sys
import io
import threading

import cv2
import numpy as np
import torch
import torchvision
from ultralytics import YOLO
from PIL import Image, ImageDraw, ImageFont
from boxmot import ByteTrack

# Windows文字エンコーディング設定
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', line_buffering=True)

# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')

# GPU使用時の最適化
if device.type == 'cuda':
    torch.backends.cudnn.benchmark = True

# モデル情報の構造化
MODEL_INFO = {
    'n': {'name': 'Nano', 'desc': '最小'},
    's': {'name': 'Small', 'desc': '小'},
    'm': {'name': 'Medium', 'desc': '中'},
    'b': {'name': 'Balanced', 'desc': '中上'},
    'l': {'name': 'Large', 'desc': '大'},
    'x': {'name': 'Extra Large', 'desc': '最大'}
}

# 調整可能な設定値
CONF_THRESH = 0.5           # 信頼度閾値
IMG_SIZE = 640             # 推論画像サイズ
CLAHE_CLIP_LIMIT = 2.0     # CLAHE制限値
CLAHE_TILE_SIZE = (8, 8)   # CLAHEタイルサイズ
WINDOW_NAME = "YOLOv10物体検出"
TTA_ENABLED = True         # TTA(Test Time Augmentation)の有効/無効
TTA_CONF_BOOST = 0.03      # TTA使用時の信頼度ブースト値
NMS_THRESHOLD = 0.6        # TTA用のNMS閾値
USE_TRACKER = True         # トラッカーの使用有無

# CLAHEオブジェクトをグローバルスコープで一度だけ定義
clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP_LIMIT, tileGridSize=CLAHE_TILE_SIZE)

# ByteTrackトラッカーを初期化
tracker = None

# BGR→RGB色変換のヘルパー関数
def bgr_to_rgb(color_bgr):
    """BGRカラーをRGBカラーに変換"""
    return (int(color_bgr[2]), int(color_bgr[1]), int(color_bgr[0]))

# IDから色を生成する関数
def get_color_from_id(track_id):
    """IDをハッシュ化してHSV色空間の色を生成"""
    hue = int((track_id * 37) % 180)
    hsv = np.uint8([[[hue, 255, 255]]])
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0][0]
    return (int(bgr[0]), int(bgr[1]), int(bgr[2]))

# personクラス用のデフォルト色(トラッカー無効時)
PERSON_COLOR = (0, 255, 0)

# 日本語フォント設定
FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'
FONT_SIZE_MAIN = 20
FONT_SIZE_SMALL = 14
font_main = ImageFont.truetype(FONT_PATH, FONT_SIZE_MAIN)
font_small = ImageFont.truetype(FONT_PATH, FONT_SIZE_SMALL)

# グローバル変数
frame_count = 0
results_log = []
person_count = 0
model = None

class ThreadedVideoCapture:
    """スレッド化されたVideoCapture(常に最新フレームを取得)"""
    def __init__(self, src, is_camera=False):
        if is_camera:
            self.cap = cv2.VideoCapture(src, cv2.CAP_DSHOW)
            fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
            self.cap.set(cv2.CAP_PROP_FOURCC, fourcc)
            self.cap.set(cv2.CAP_PROP_FPS, 60)
        else:
            self.cap = cv2.VideoCapture(src)

        self.grabbed, self.frame = self.cap.read()
        self.stopped = False
        self.lock = threading.Lock()
        self.thread = threading.Thread(target=self.update, args=())
        self.thread.daemon = True
        self.thread.start()

    def update(self):
        """バックグラウンドでフレームを取得し続ける"""
        while not self.stopped:
            grabbed, frame = self.cap.read()
            with self.lock:
                self.grabbed = grabbed
                if grabbed:
                    self.frame = frame

    def read(self):
        """最新フレームを返す"""
        with self.lock:
            return self.grabbed, self.frame.copy() if self.grabbed else None

    def isOpened(self):
        return self.cap.isOpened()

    def get(self, prop):
        return self.cap.get(prop)

    def release(self):
        self.stopped = True
        self.thread.join()
        self.cap.release()

def display_program_header():
    print('=== YOLOv10物体検出プログラム ===')
    print('概要: フレームごとに人物を検出し、矩形とラベルを描画して表示します')
    print('機能: YOLOv10によるpersonクラス検出(NMSフリー設計)+ TTA + ByteTrack追跡')
    print('技術: CLAHE(コントラスト強化)+ TTA(Test Time Augmentation)+ ByteTrack')
    print('操作: qキーで終了')
    print('出力: 各フレームの全検出一覧をprint表示し、終了時にresult.txtに保存')
    print('注意: YOLOv10モデルファイル(yolov10n.pt等)が必要です')
    print()

def apply_tta_inference(frame, model, conf):
    """Test Time Augmentation (TTA)を適用した推論"""
    frame_width = frame.shape[1]

    # 水平反転画像を作成
    flipped_frame = cv2.flip(frame, 1)

    # バッチ推論(元画像と反転画像を同時に処理)
    results = model.predict([frame, flipped_frame], conf=conf, imgsz=IMG_SIZE, verbose=False, device=str(device))

    # 元画像の結果を取得
    all_boxes = []
    all_confs = []
    all_classes = []

    if results[0].boxes is not None and len(results[0].boxes) > 0:
        boxes_orig = results[0].boxes.xyxy
        confs_orig = results[0].boxes.conf
        classes_orig = results[0].boxes.cls

        # personクラス(0)のみフィルタリング
        person_mask = classes_orig == 0
        if person_mask.sum() > 0:
            all_boxes.append(boxes_orig[person_mask])
            all_confs.append(confs_orig[person_mask])
            all_classes.append(classes_orig[person_mask])

    # 反転画像の結果を取得し、座標を元に戻す
    if len(results) > 1 and results[1].boxes is not None and len(results[1].boxes) > 0:
        boxes_flipped = results[1].boxes.xyxy.clone()
        confs_flipped = results[1].boxes.conf
        classes_flipped = results[1].boxes.cls

        # personクラス(0)のみフィルタリング
        person_mask = classes_flipped == 0
        if person_mask.sum() > 0:
            boxes_flipped = boxes_flipped[person_mask]
            confs_flipped = confs_flipped[person_mask]
            classes_flipped = classes_flipped[person_mask]

            # 水平反転画像での検出結果を元の画像座標系に変換
            # x1, x2 の大小関係を保つ必要がある
            if boxes_flipped.shape[0] > 0:
                x1_flipped = boxes_flipped[:, 0].clone()
                x2_flipped = boxes_flipped[:, 2].clone()
                # 元の画像座標系での新しい座標
                boxes_flipped[:, 0] = frame_width - 1 - x2_flipped  # 新しいx1(左端)
                boxes_flipped[:, 2] = frame_width - 1 - x1_flipped  # 新しいx2(右端)

            all_boxes.append(boxes_flipped)
            all_confs.append(confs_flipped)
            all_classes.append(classes_flipped)

    # 結果が空の場合は空リストを返す
    if len(all_boxes) == 0:
        return []

    # 全ての結果を結合
    all_boxes = torch.cat(all_boxes, dim=0)
    all_confs = torch.cat(all_confs, dim=0)
    all_classes = torch.cat(all_classes, dim=0)

    # 信頼度閾値でフィルタリング(NMS前に実施)
    valid_indices = all_confs > conf
    if valid_indices.sum() > 0:
        all_boxes = all_boxes[valid_indices]
        all_confs = all_confs[valid_indices]
        all_classes = all_classes[valid_indices]

        # torchvisionのNMSを使用
        nms_indices = torchvision.ops.nms(all_boxes, all_confs, iou_threshold=NMS_THRESHOLD)
        final_boxes = all_boxes[nms_indices].cpu().numpy()
        final_confs = all_confs[nms_indices].cpu().numpy()
        final_classes = all_classes[nms_indices].cpu().numpy()

        # 結果をリスト形式に変換
        detections = []
        for i in range(len(final_confs)):
            # TTAで検出された場合、信頼度をブースト
            conf_boost = TTA_CONF_BOOST if TTA_ENABLED else 0
            detections.append({
                'x1': int(final_boxes[i][0]), 'y1': int(final_boxes[i][1]),
                'x2': int(final_boxes[i][2]), 'y2': int(final_boxes[i][3]),
                'conf': min(1.0, final_confs[i] + conf_boost),
                'class': int(final_classes[i])
            })

        return detections

    return []

def normal_inference(frame, model, conf):
    """通常の推論処理"""
    results = model.predict(frame, conf=conf, imgsz=IMG_SIZE, verbose=False, device=str(device))
    curr_dets = []

    res = results[0]
    if res.boxes is not None:
        boxes = res.boxes.xyxy.cpu().numpy()
        confs = res.boxes.conf.cpu().numpy()
        clses = res.boxes.cls.cpu().numpy()

        for i in range(len(boxes)):
            cls = int(clses[i])
            # personクラス(0)のみ処理
            if cls == 0:
                box = boxes[i].astype(int)
                conf_score = float(confs[i])
                curr_dets.append({
                    'x1': box[0], 'y1': box[1],
                    'x2': box[2], 'y2': box[3],
                    'conf': conf_score,
                    'class': cls
                })

    return curr_dets

def apply_tta_if_enabled(frame, model, conf):
    """TTA機能を条件付きで適用"""
    if not TTA_ENABLED:
        return normal_inference(frame, model, conf)
    return apply_tta_inference(frame, model, conf)

def apply_bytetrack(detections, frame):
    """ByteTrackerを使用したトラッキング処理"""
    global tracker

    # 検出結果が0件でもトラッカーの状態更新と予測結果取得を行う
    if len(detections) > 0:
        dets_array = np.array([[d['x1'], d['y1'], d['x2'], d['y2'], d['conf'], d['class']]
                               for d in detections])
    else:
        # 検出がない場合は空の配列を渡す
        dets_array = np.empty((0, 6))

    # 常にトラッカーを更新し、現在のフレームでの追跡結果(または予測結果)を取得する
    tracks = tracker.update(dets_array, frame)

    tracked_dets = []
    # tracker.updateが返す結果を処理する(検出0件でも予測結果が返る可能性がある)
    if len(tracks) > 0:
        for track in tracks:
            if len(track) >= 7:
                x1, y1, x2, y2, track_id, conf, cls = track[:7]
                tracked_dets.append({
                    'x1': int(x1), 'y1': int(y1),
                    'x2': int(x2), 'y2': int(y2),
                    'track_id': int(track_id),
                    'conf': float(conf),
                    'class': int(cls)
                })
    return tracked_dets

def apply_tracking_if_enabled(detections, frame):
    """トラッキング機能を条件付きで適用"""
    if not USE_TRACKER:
        return detections
    return apply_bytetrack(detections, frame)

def draw_detection_results(frame, detections):
    """物体検出の描画処理"""
    # バウンディングボックスを描画
    for det in detections:
        # IDに応じた色を決定
        if USE_TRACKER and 'track_id' in det:
            box_color = get_color_from_id(det['track_id'])
        else:
            box_color = PERSON_COLOR

        cv2.rectangle(frame, (det['x1'], det['y1']),
                     (det['x2'], det['y2']), box_color, 2)

    # 構造化されたテキスト描画を実行
    texts_to_draw = []
    for det in detections:
        if USE_TRACKER and 'track_id' in det:
            label = f"ID:{det['track_id']} 人"
        else:
            label = "人"
        texts_to_draw.append({
            'text': label,
            'org': (det['x1'], max(0, det['y1'] - 22)),
            'color': (255, 0, 0),
            'font_type': 'main'
        })
        texts_to_draw.append({
            'text': f"Conf:{det['conf']:.2f}",
            'org': (det['x1'], det['y2'] + 4),
            'color': (255, 255, 255),
            'font_type': 'small'
        })

    tta_status = "TTA:ON" if TTA_ENABLED else "TTA:OFF"
    tracker_status = "ByteTrack:ON" if USE_TRACKER else "ByteTrack:OFF"
    texts_to_draw.append({
        'text': f'YOLOv10 ({device.type}) | Frame: {frame_count} | Dets: {len(detections)} | {tta_status} | {tracker_status}',
        'org': (10, 10),
        'color': (255, 255, 255),
        'font_type': 'main'
    })
    texts_to_draw.append({
        'text': '操作: q=終了',
        'org': (10, 36),
        'color': (255, 255, 0),
        'font_type': 'small'
    })

    frame = draw_texts_with_pillow(frame, texts_to_draw)
    return frame

def format_detection_output(detections):
    """物体検出の出力フォーマット"""
    parts = [f'count={len(detections)}']
    for det in detections:
        x1, y1, x2, y2 = det['x1'], det['y1'], det['x2'], det['y2']
        conf = det['conf']
        if USE_TRACKER and 'track_id' in det:
            parts.append(f'ID={det["track_id"]},conf={conf:.3f},box=[{x1},{y1},{x2},{y2}]')
        else:
            parts.append(f'conf={conf:.3f},box=[{x1},{y1},{x2},{y2}]')
    return '; '.join(parts) if len(parts) == 1 else parts[0] + '; ' + ' | '.join(parts[1:])

def draw_texts_with_pillow(bgr_frame, texts):
    """Pillowを使用した日本語テキスト描画"""
    img_pil = Image.fromarray(cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img_pil)

    for item in texts:
        text = item['text']
        x, y = item['org']
        color = item['color']
        font_type = item.get('font_type', 'main')
        font = font_main if font_type == 'main' else font_small
        draw.text((x, y), text, font=font, fill=color)

    return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

def detect_objects(frame):
    """共通の検出処理(CLAHE前処理、YOLOv10推論、検出を実行)"""
    global model

    # AIモデルの入力用にCLAHEを適用(YUV色空間で輝度チャンネルのみ処理)
    yuv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
    yuv_frame[:, :, 0] = clahe.apply(yuv_frame[:, :, 0])
    enh_frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR)

    # TTA適用
    curr_dets = apply_tta_if_enabled(enh_frame, model, CONF_THRESH)

    return curr_dets

def process_video_frame(frame):
    """動画フレーム処理用ラッパー"""
    # 共通の検出処理
    detections = detect_objects(frame)

    # トラッキングを条件付きで適用
    tracked_dets = apply_tracking_if_enabled(detections, frame)

    # 検出数を更新
    global person_count
    person_count += len(tracked_dets)

    # 物体検出固有の描画処理
    frame = draw_detection_results(frame, tracked_dets)

    # 物体検出固有の出力フォーマット
    result = format_detection_output(tracked_dets)

    return frame, result

def video_frame_processing(frame, timestamp_ms, is_camera):
    """動画フレーム処理のメイン関数"""
    global frame_count
    current_time = time.time()
    frame_count += 1

    processed_frame, result = process_video_frame(frame)
    return processed_frame, result, current_time

# プログラムヘッダー表示
display_program_header()

# ByteTrackとTTAの有効化選択
print("=== 機能選択 ===")
print("1: ByteTrack, TTA (Test Time Augmentation) 無効化")
print("2: ByteTrack, TTA (Test Time Augmentation) 有効化")
feature_choice = input("選択 (1/2) [デフォルト: 1]: ").strip()

if feature_choice == '2':
    TTA_ENABLED = True
    USE_TRACKER = True
    print("ByteTrackとTTAを有効化しました")
else:
    TTA_ENABLED = False
    USE_TRACKER = False
    print("ByteTrackとTTAを無効化しました")

# トラッカー初期化(USE_TRACKERがTrueの場合のみ)
if USE_TRACKER:
    tracker = ByteTrack()

print()

# モデル選択
print("=== モデル選択 ===")
print('使用するYOLOv10モデルを選択してください:')
for key, info in MODEL_INFO.items():
    print(f'{key}: {info["name"]} - {info["desc"]}')
print()

model_choice = ''
while model_choice not in MODEL_INFO.keys():
    model_choice = input("選択 (n/s/m/b/l/x) [デフォルト: n]: ").strip().lower()
    if model_choice == '':
        model_choice = 'n'
        print('デフォルト(n)を使用します')
        break
    if model_choice not in MODEL_INFO.keys():
        print("無効な選択です。もう一度入力してください。")

MODEL_NAME = f'yolov10{model_choice}.pt'
print(f'選択されたモデル: {MODEL_NAME}')

# モデルの初期化
print(f"\nモデル初期化中...")
try:
    model = YOLO(MODEL_NAME)
    model.to(device)
    model.eval()
    print('モデル初期化が完了しました')
except Exception as e:
    print('モデルの初期化に失敗しました')
    print(f'エラー: {e}')
    print('注意: モデルファイルの存在とパスを確認してください')
    raise SystemExit(1)

# TTA設定の表示
if TTA_ENABLED:
    print("\nTest Time Augmentation (TTA): 有効")
    print("  - 水平反転による推論結果の統合")
    print(f"  - 信頼度ブースト値: {TTA_CONF_BOOST}")
    print(f"  - NMS閾値: {NMS_THRESHOLD}")
else:
    print("\nTest Time Augmentation (TTA): 無効")

# ByteTrack設定の表示
if USE_TRACKER:
    print("\nByteTrack: 有効")
    print("  - カルマンフィルタによる動き予測")
else:
    print("\nByteTrack: 無効")

# 入力選択
print("\n=== 入力ソース選択 ===")
print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")

choice = input("選択: ")
is_camera = (choice == '1')

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        raise SystemExit(0)
    cap = cv2.VideoCapture(path)
elif choice == '1':
    cap = ThreadedVideoCapture(0, is_camera=True)
else:
    print("サンプル動画をダウンロード中...")
    SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
    SAMPLE_FILE = 'vtest.avi'
    urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
    cap = cv2.VideoCapture(SAMPLE_FILE)

if not cap.isOpened():
    print('動画ファイル・カメラを開けませんでした')
    raise SystemExit(1)

# フレームレートの取得とタイムスタンプ増分の計算
if is_camera:
    actual_fps = cap.get(cv2.CAP_PROP_FPS)
    print(f'カメラのfps: {actual_fps}')
    timestamp_increment = int(1000 / actual_fps) if actual_fps > 0 else 33
else:
    video_fps = cap.get(cv2.CAP_PROP_FPS)
    timestamp_increment = int(1000 / video_fps) if video_fps > 0 else 33

# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print('  q キー: プログラム終了')

start_time = time.time()
last_info_time = start_time
info_interval = 10.0
timestamp_ms = 0
total_processing_time = 0.0

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        timestamp_ms += timestamp_increment

        processing_start = time.time()
        processed_frame, result, current_time = video_frame_processing(frame, timestamp_ms, is_camera)
        processing_time = time.time() - processing_start
        total_processing_time += processing_time

        cv2.imshow(WINDOW_NAME, processed_frame)

        if result:
            if is_camera:
                timestamp = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                print(f'{timestamp}, {result}')
            else:
                print(f'Frame {frame_count}: {result}')

            results_log.append(result)

        # 情報提供(カメラモードのみ、info_interval秒ごと)
        if is_camera:
            elapsed = current_time - last_info_time
            if elapsed >= info_interval:
                total_elapsed = current_time - start_time
                actual_fps = frame_count / total_elapsed if total_elapsed > 0 else 0
                avg_processing_time = (total_processing_time / frame_count * 1000) if frame_count > 0 else 0
                print(f'[情報] 経過時間: {total_elapsed:.1f}秒, 処理フレーム数: {frame_count}, 実測fps: {actual_fps:.1f}, 平均処理時間: {avg_processing_time:.1f}ms')
                last_info_time = current_time

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

finally:
    print('\n=== プログラム終了 ===')
    cap.release()
    cv2.destroyAllWindows()

    if results_log:
        with open('result.txt', 'w', encoding='utf-8') as f:
            f.write('=== YOLOv10物体検出結果 ===\n')
            f.write(f'処理フレーム数: {frame_count}\n')
            f.write(f'使用モデル: {MODEL_NAME}\n')
            f.write(f'モデル情報: {MODEL_INFO[model_choice]["name"]} - {MODEL_INFO[model_choice]["desc"]}\n')
            f.write(f'使用デバイス: {str(device).upper()}\n')
            if device.type == 'cuda':
                f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
            f.write(f'画像処理: CLAHE適用(YUV色空間)\n')
            f.write(f'TTA (Test Time Augmentation): {"有効" if TTA_ENABLED else "無効"}\n')
            if TTA_ENABLED:
                f.write(f'  - NMS閾値: {NMS_THRESHOLD}\n')
                f.write(f'  - 信頼度ブースト: {TTA_CONF_BOOST}\n')
            f.write(f'ByteTrack: {"有効" if USE_TRACKER else "無効"}\n')
            f.write(f'信頼度閾値: {CONF_THRESH}\n')
            f.write('\n')
            f.write('\n'.join(results_log))
        print(f'\n処理結果をresult.txtに保存しました')
        print(f'検出数: {person_count}')