YOLO11による人物検出・ByteTrackによる追跡とTTAの機能付き(ソースコードと説明と利用ガイド)
プログラム利用ガイド
1. このプログラムの利用シーン
動画ファイルやウェブカメラの映像から人物をリアルタイムで検出し、追跡するためのツールである。監視カメラの映像分析、人流計測、スポーツ映像の分析などに利用できる。
2. 主な機能
- リアルタイム人物検出: YOLO11により動画の各フレームから人物を検出する。
- 物体追跡: ByteTrackにより検出された人物に一意のIDを割り当て、フレーム間で追跡する。
- 3つの入力ソース: 動画ファイル、ウェブカメラ、サンプル動画から選択できる。
- モデル選択: 5種類のYOLO11モデル(nano、small、medium、large、extra large)から用途に応じて選択できる。
- 処理結果の保存: 検出結果をresult.txtファイルに保存する。
3. 基本的な使い方
- プログラムの起動: Pythonでプログラムを実行する。
- モデルの選択: 使用するYOLO11モデルを選択する(デフォルトはsmall)。
- 入力ソースの選択:
- 0: 動画ファイル(ファイル選択ダイアログが表示される)
- 1: ウェブカメラ
- 2: サンプル動画(自動ダウンロード)
- 検出と追跡: リアルタイムで人物の検出と追跡が行われる。
- 終了: qキーを押すとプログラムが終了し、結果がresult.txtに保存される。
4. 便利な機能
- TTA(Test Time Augmentation): 水平反転による推論結果の統合により、検出精度が向上する(デフォルトで有効)。
- CLAHE前処理: 低照度環境でもコントラスト強調により検出性能が維持される。
- ByteTrack追跡: 遮蔽された人物も追跡を継続し、ID管理が安定する。
- GPU自動選択: CUDAが利用可能な場合は自動的にGPUで高速処理される。
- 日本語表示: 検出結果が日本語で表示される。
Python開発環境,ライブラリ類
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。
Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なライブラリのインストール
コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install ultralytics opencv-python numpy pillow boxmot
YOLO11による人物検出・ByteTrackによる追跡とTTAの機能付き
概要
このプログラムは、動画の各フレームから人物を検出し、リアルタイムで追跡するシステムである。YOLO11による物体検出とByteTrackによる追跡を組み合わせ、動画ファイル、ウェブカメラ、サンプル動画の3つの入力ソースに対応する。
主要技術
YOLO11(You Only Look Once 11)
Ultralyticsが開発したリアルタイム物体検出モデルの最新版である[1]。単一ニューラルネットワークにより画像全体を一度に処理し、バウンディングボックスとクラス確率を同時に予測する。COCOデータセットで事前学習された5種類のモデル(nano、small、medium、large、extra large)から選択可能である。
ByteTrack
Zhang et al.が2022年に発表した多物体追跡アルゴリズムである[2]。カルマンフィルタによる動き予測とハンガリアンアルゴリズムによるデータアソシエーションを組み合わせる。低信頼度の検出ボックスも利用することで、遮蔽された物体の追跡を可能にする。
技術的特徴
- CLAHE前処理
Contrast Limited Adaptive Histogram Equalization(コントラスト制限適応ヒストグラム均等化)を適用する[3]。YUV色空間の輝度チャンネルに対して局所的なコントラスト強調を行い、低照度環境での検出性能を向上させる。
- TTA(Test Time Augmentation)
推論時データ拡張手法である[4]。水平反転した画像に対しても推論を実行し、Non-Maximum Suppression(NMS)により結果を統合することで、検出の信頼性を向上させる。
- GPU/CPU自動選択
PyTorchのデバイス検出機能により、CUDAが利用可能な場合はGPUで、そうでない場合はCPUで動作する。
実装の特色
- 水平反転によるTTA実装
元画像と水平反転画像をバッチ推論し、座標変換と独立したNMS閾値による統合を行う。
- カルマンフィルタベースの追跡
ByteTrackにより各検出対象に一意のIDを割り当て、フレーム間で追跡する。
- 入力ソース対応
動画ファイル選択(tkinterによるGUI)、ウェブカメラ入力、サンプル動画ダウンロードに対応する。
- 日本語表示と結果保存
Pillow(PIL)を用いて日本語フォント(meiryo.ttc)で検出結果を描画し、処理結果をテキストファイルに保存する。
参考文献
[1] Ultralytics. (2024). YOLO11. https://github.com/ultralytics/ultralytics
[2] Zhang, Y., Sun, P., Jiang, Y., Yu, D., Weng, F., Yuan, Z., Luo, P., Liu, W., & Wang, X. (2022). ByteTrack: Multi-Object Tracking by Associating Every Detection Box. In European Conference on Computer Vision (ECCV 2022). https://arxiv.org/abs/2110.06864
[3] Zuiderveld, K. (1994). Contrast Limited Adaptive Histogram Equalization. Graphics Gems IV, 474-485.
[4] Shanmugam, D., Blalock, D., Balakrishnan, G., & Guttag, J. (2020). Better Aggregation in Test-Time Augmentation. In Proceedings of the IEEE/CVF International Conference on Computer Vision (ICCV).
ソースコード
"""
プログラム名: YOLO11による人物検出・ByteTrackによる追跡とTTAの機能付き
特徴技術名: YOLO11 (Ultralytics)
出典: Ultralytics. (2024). YOLO11. GitHub. https://github.com/ultralytics/ultralytics
特徴機能: 単一ニューラルネットワークによるリアルタイム物体検出。画像全体を一度に処理し、バウンディングボックスとクラス確率を同時に予測
学習済みモデル: yolo11n/s/m/l/x.pt - YOLO11モデル(ユーザー選択)、COCOデータセット(80クラス)で事前学習済み、推論に最適化、https://github.com/ultralytics/assets/releases
特徴技術および学習済モデルの利用制限: AGPL-3.0ライセンス(オープンソース)。商用利用の場合はEnterprise License要取得(Ultralytics公式サイト参照)
方式設計:
関連利用技術:
- OpenCV: 画像・動画処理、カメラ制御
- CLAHE (Contrast Limited Adaptive Histogram Equalization): 低照度環境での画像品質向上
- PyTorch: ディープラーニングフレームワーク、GPU/CPU自動選択
- ByteTrack: カルマンフィルタとハンガリアンアルゴリズムによる高精度物体追跡(boxmotパッケージ版)
- TTA (Test Time Augmentation): 複数の画像変換で推論し結果を統合
入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)、出力: OpenCV画面でリアルタイム表示、検出結果をresult.txtに保存
処理手順: 1.動画フレーム取得→2.CLAHE前処理→3.TTA適用→4.YOLO11推論実行→5.バウンディングボックス抽出→6.ByteTrack追跡→7.結果描画
前処理、後処理: 前処理:CLAHE適用による画像コントラスト強化、後処理:ByteTrack追跡による検出結果の安定化とID管理
追加処理: TTA - 水平反転による推論結果の統合
調整を必要とする設定値: CONF_THRESH(信頼度閾値、デフォルト0.45)- 検出感度を制御、値が低いほど多くの物体を検出、TTA_ENABLED(TTAの有効/無効)、USE_TRACKER(ByteTrackの有効/無効)
将来方策: 信頼度閾値の自動最適化 - 検出結果の時系列分析により、シーンごとに最適な閾値を動的に学習・適用する機能
その他の重要事項: personクラスのみ検出
前準備:
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install ultralytics opencv-python numpy pillow boxmot
"""
import cv2
import numpy as np
import torch
import torchvision
from ultralytics import YOLO
import tkinter as tk
from tkinter import filedialog
import urllib.request
import time
import sys
import io
from datetime import datetime
from PIL import Image, ImageDraw, ImageFont
from boxmot import ByteTrack
import threading
# Windows文字エンコーディング設定
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', line_buffering=True)
# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')
# GPU使用時の最適化
if device.type == 'cuda':
torch.backends.cudnn.benchmark = True
# モデル情報の構造化
MODEL_INFO = {
'n': {'name': 'Nano', 'params': '2.6M', 'mAP': '39.5%', 'desc': '最速'},
's': {'name': 'Small', 'params': '9.4M', 'mAP': '47.0%', 'desc': 'デフォルト'},
'm': {'name': 'Medium', 'params': '20.1M', 'mAP': '51.5%', 'desc': '中程度'},
'l': {'name': 'Large', 'params': '25.3M', 'mAP': '53.4%', 'desc': '高精度'},
'x': {'name': 'Extra Large', 'params': '56.9M', 'mAP': '54.7%', 'desc': '最高精度'}
}
# 調整可能な設定値
CONF_THRESH = 0.45 # 信頼度閾値 - 検出感度制御
IOU_THRESH = 0.45 # IoU閾値 - 重複除去制御
NMS_THRESHOLD = 0.6 # TTA用のNMS閾値(独立管理)
IMG_SIZE = 640 # 推論画像サイズ
CLAHE_CLIP_LIMIT = 3.0 # CLAHE制限値
CLAHE_TILE_SIZE = (8, 8) # CLAHEタイルサイズ
WINDOW_NAME = "Person Detection" # OpenCVウィンドウ名
TTA_CONF_BOOST = 0.03 # TTA使用時の信頼度ブースト値
# ByteTrackとTTAの有効/無効(ユーザー選択により設定)
TTA_ENABLED = False
USE_TRACKER = False
# CLAHEオブジェクトをグローバルスコープで一度だけ定義(AIモデルの入力用にCLAHEを適用)
clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP_LIMIT, tileGridSize=CLAHE_TILE_SIZE)
# ByteTrackトラッカー(初期化は後で行う)
tracker = None
# BGR→RGB色変換のヘルパー関数
def bgr_to_rgb(color_bgr):
"""BGRカラーをRGBカラーに変換"""
return (color_bgr[2], color_bgr[1], color_bgr[0])
# 人物検出用の色(青)
PERSON_COLOR = (255, 0, 0)
# IDから色を生成する関数
def get_color_from_id(track_id):
"""IDをハッシュ化してHSV色空間の色を生成"""
hue = int((track_id * 37) % 180)
hsv = np.uint8([[[hue, 255, 255]]])
bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0][0]
return (int(bgr[0]), int(bgr[1]), int(bgr[2]))
# 日本語フォント設定(フォントサイズの分離管理)
FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'
FONT_SIZE_MAIN = 16
FONT_SIZE_SMALL = 12
font_main = ImageFont.truetype(FONT_PATH, FONT_SIZE_MAIN)
font_small = ImageFont.truetype(FONT_PATH, FONT_SIZE_SMALL)
# グローバル変数
frame_count = 0
results_log = []
person_count = 0
model = None
class ThreadedVideoCapture:
"""スレッド化されたVideoCapture(常に最新フレームを取得)"""
def __init__(self, src, is_camera=False):
if is_camera:
self.cap = cv2.VideoCapture(src, cv2.CAP_DSHOW)
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
self.cap.set(cv2.CAP_PROP_FOURCC, fourcc)
self.cap.set(cv2.CAP_PROP_FPS, 60)
else:
self.cap = cv2.VideoCapture(src)
self.grabbed, self.frame = self.cap.read()
self.stopped = False
self.lock = threading.Lock()
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
def update(self):
"""バックグラウンドでフレームを取得し続ける"""
while not self.stopped:
grabbed, frame = self.cap.read()
with self.lock:
self.grabbed = grabbed
if grabbed:
self.frame = frame
def read(self):
"""最新フレームを返す"""
with self.lock:
return self.grabbed, self.frame.copy() if self.grabbed else (self.grabbed, None)
def isOpened(self):
return self.cap.isOpened()
def get(self, prop):
return self.cap.get(prop)
def release(self):
self.stopped = True
self.thread.join()
self.cap.release()
# ===== モデル関連の処理を集約 =====
def initialize_model(model_choice):
"""モデル初期化を集約"""
model_name = f'yolo11{model_choice}.pt'
model = YOLO(model_name)
model.to(device)
model.eval()
return model, model_name
def run_model_inference(model, frame, conf, iou, img_size, device_obj):
"""推論処理を集約"""
results = model(frame, conf=conf, iou=iou, imgsz=img_size, verbose=False, device=device_obj)
return results
def normal_inference(frame, model, conf):
"""通常の推論処理"""
results = run_model_inference(model, frame, conf, IOU_THRESH, IMG_SIZE, device)
curr_dets = []
for r in results:
if r.boxes is not None:
for box in r.boxes:
cls = int(box.cls[0].cpu().numpy())
name = model.names[cls]
if name == 'person':
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf_score = float(box.conf[0].cpu().numpy())
curr_dets.append({
'x1': int(x1), 'y1': int(y1),
'x2': int(x2), 'y2': int(y2),
'conf': conf_score,
'class': cls,
'name': name
})
return curr_dets
# ===== TTA機能を独立化 =====
def apply_tta_inference(frame, model, conf):
"""Test Time Augmentation (TTA)を適用した推論"""
frame_width = frame.shape[1]
# 水平反転画像を作成
flipped_frame = cv2.flip(frame, 1)
# バッチ推論(元画像と反転画像を同時に処理、デバイスを直接指定)
results = model([frame, flipped_frame], conf=conf, iou=IOU_THRESH,
imgsz=IMG_SIZE, verbose=False, device=device)
# 元画像の結果を取得
all_boxes = []
all_confs = []
all_classes = []
if results[0].boxes is not None and len(results[0].boxes) > 0:
boxes_orig = results[0].boxes.xyxy
confs_orig = results[0].boxes.conf
classes_orig = results[0].boxes.cls
# personクラスのみフィルタリング
person_mask = torch.tensor([model.names[int(c)] == 'person' for c in classes_orig])
if person_mask.any():
all_boxes.append(boxes_orig[person_mask])
all_confs.append(confs_orig[person_mask])
all_classes.append(classes_orig[person_mask])
# 反転画像の結果を取得し、座標を元に戻す
if len(results) > 1 and results[1].boxes is not None and len(results[1].boxes) > 0:
boxes_flipped = results[1].boxes.xyxy.clone()
confs_flipped = results[1].boxes.conf
classes_flipped = results[1].boxes.cls
# personクラスのみフィルタリング
person_mask = torch.tensor([model.names[int(c)] == 'person' for c in classes_flipped])
if person_mask.any():
boxes_flipped = boxes_flipped[person_mask]
confs_flipped = confs_flipped[person_mask]
classes_flipped = classes_flipped[person_mask]
# 水平反転画像での検出結果を元の画像座標系に変換
# x1, x2 の大小関係を保つ必要がある
if boxes_flipped.shape[0] > 0:
x1_flipped = boxes_flipped[:, 0].clone()
x2_flipped = boxes_flipped[:, 2].clone()
# 元の画像座標系での新しい座標
boxes_flipped[:, 0] = frame_width - 1 - x2_flipped # 新しいx1(左端)
boxes_flipped[:, 2] = frame_width - 1 - x1_flipped # 新しいx2(右端)
all_boxes.append(boxes_flipped)
all_confs.append(confs_flipped)
all_classes.append(classes_flipped)
# 結果が空の場合は空リストを返す
if len(all_boxes) == 0:
return []
# 全ての結果を結合
all_boxes = torch.cat(all_boxes, dim=0)
all_confs = torch.cat(all_confs, dim=0)
all_classes = torch.cat(all_classes, dim=0)
# 信頼度閾値でフィルタリング(NMS前に実施)
valid_indices = all_confs > conf
if valid_indices.sum() > 0:
all_boxes = all_boxes[valid_indices]
all_confs = all_confs[valid_indices]
all_classes = all_classes[valid_indices]
# torchvisionのNMSを使用
nms_indices = torchvision.ops.nms(all_boxes, all_confs, iou_threshold=NMS_THRESHOLD)
final_boxes = all_boxes[nms_indices].cpu().numpy()
final_confs = all_confs[nms_indices].cpu().numpy()
final_classes = all_classes[nms_indices].cpu().numpy()
# 結果をリスト形式に変換
detections = []
for i in range(len(final_confs)):
# TTAで検出された場合、信頼度をブースト
conf_boost = TTA_CONF_BOOST if TTA_ENABLED else 0
detections.append({
'x1': final_boxes[i][0], 'y1': final_boxes[i][1],
'x2': final_boxes[i][2], 'y2': final_boxes[i][3],
'conf': min(1.0, final_confs[i] + conf_boost),
'class': int(final_classes[i])
})
# nameフィールドを追加
for det in detections:
det['name'] = model.names[det['class']]
# 整数座標に変換
for det in detections:
det['x1'] = int(det['x1'])
det['y1'] = int(det['y1'])
det['x2'] = int(det['x2'])
det['y2'] = int(det['y2'])
return detections
return []
def apply_tta_if_enabled(frame, model, conf):
"""TTA機能を条件付きで適用"""
if not TTA_ENABLED:
return normal_inference(frame, model, conf)
return apply_tta_inference(frame, model, conf)
# ===== トラッキング機能を独立化 =====
def apply_bytetrack(detections, frame):
"""ByteTrackerを使用したトラッキング処理"""
global tracker
# 検出結果が0件でもトラッカーの状態更新と予測結果取得を行う
if len(detections) > 0:
dets_array = np.array([[d['x1'], d['y1'], d['x2'], d['y2'], d['conf'], d['class']]
for d in detections])
else:
# 検出がない場合は空の配列を渡す
dets_array = np.empty((0, 6))
# 常にトラッカーを更新し、現在のフレームでの追跡結果(または予測結果)を取得する
tracks = tracker.update(dets_array, frame)
tracked_dets = []
# tracker.updateが返す結果を処理する(検出0件でも予測結果が返る可能性がある)
if len(tracks) > 0:
for track in tracks:
if len(track) >= 7:
x1, y1, x2, y2, track_id, conf, cls = track[:7]
name = model.names[int(cls)]
tracked_dets.append({
'x1': int(x1), 'y1': int(y1),
'x2': int(x2), 'y2': int(y2),
'track_id': int(track_id),
'conf': float(conf),
'class': int(cls),
'name': name
})
return tracked_dets
def apply_tracking_if_enabled(detections, frame):
"""トラッキング機能を条件付きで適用"""
if not USE_TRACKER:
return detections
return apply_bytetrack(detections, frame)
# ===== 物体検出タスク固有の処理 =====
def process_detection_results(detections):
"""物体検出の結果処理"""
global person_count
# 人物検出数を更新
person_count += len(detections)
return detections
def draw_detection_results(frame, detections):
"""物体検出の描画処理"""
# バウンディングボックスを描画(OpenCVで)
for det in detections:
# ByteTrack有効時はIDから色を生成、無効時は固定色
if USE_TRACKER and 'track_id' in det:
box_color = get_color_from_id(det['track_id'])
else:
box_color = PERSON_COLOR
cv2.rectangle(frame, (det['x1'], det['y1']),
(det['x2'], det['y2']), box_color, 2)
# 構造化されたテキスト描画を実行
if font_main is not None:
texts_to_draw = []
for det in detections:
track_id = det.get('track_id', 0) if USE_TRACKER else 0
if USE_TRACKER and track_id > 0:
label = f"ID:{track_id} 人: {det['conf']:.2f}"
text_color = get_color_from_id(track_id)
else:
label = f"人: {det['conf']:.2f}"
text_color = PERSON_COLOR
texts_to_draw.append({
'text': label,
'org': (det['x1'], det['y1']-20),
'color': bgr_to_rgb(text_color),
'font_type': 'main'
})
frame = draw_texts_with_pillow(frame, texts_to_draw)
# 統計情報を描画
tta_status = "TTA:ON" if TTA_ENABLED else "TTA:OFF"
tracker_status = "ByteTrack:ON" if USE_TRACKER else "ByteTrack:OFF"
info_text = f"Persons: {len(detections)} | Frame: {frame_count} | {tta_status} | {tracker_status}"
cv2.putText(frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return frame
def format_detection_output(detections):
"""物体検出の出力フォーマット"""
if len(detections) == 0:
return 'count=0'
else:
parts = []
for det in detections:
x1, y1, x2, y2 = det['x1'], det['y1'], det['x2'], det['y2']
conf = det['conf']
parts.append(f'class=person,conf={conf:.3f},box=[{x1},{y1},{x2},{y2}]')
return f'count={len(detections)}; ' + ' | '.join(parts)
# ===== 共通処理関数 =====
def draw_texts_with_pillow(bgr_frame, texts):
"""テキスト描画, texts: list of dict with keys {text, org, color, font_type}"""
if font_main is None:
return bgr_frame
img_pil = Image.fromarray(cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
for item in texts:
text = item['text']
x, y = item['org']
color = item['color'] # RGB
font_type = item.get('font_type', 'main')
font = font_main if font_type == 'main' else font_small
draw.text((x, y), text, font=font, fill=color)
return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
def detect_objects(frame):
"""共通の検出処理(CLAHE、推論、検出を実行)"""
global model
# AIモデルの入力用にCLAHEを適用(YUV色空間で輝度チャンネルのみ処理)
yuv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
yuv_frame[:, :, 0] = clahe.apply(yuv_frame[:, :, 0])
enh_frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR)
# 固定信頼度閾値を使用
curr_dets = apply_tta_if_enabled(enh_frame, model, CONF_THRESH)
return curr_dets
def process_video_frame(frame, timestamp_ms, is_camera):
"""動画用ラッパー"""
# 共通の検出処理
detections = detect_objects(frame)
# トラッキングを条件付きで適用(frameを渡す)
tracked_dets = apply_tracking_if_enabled(detections, frame)
# 物体検出固有の結果処理
processed_dets = process_detection_results(tracked_dets)
# 物体検出固有の描画処理
frame = draw_detection_results(frame, processed_dets)
# 物体検出固有の出力フォーマット
result = format_detection_output(processed_dets)
return frame, result
def video_frame_processing(frame, timestamp_ms, is_camera):
"""動画フレーム処理(標準形式)"""
global frame_count
current_time = time.time()
frame_count += 1
processed_frame, result = process_video_frame(frame, timestamp_ms, is_camera)
return processed_frame, result, current_time
def display_program_header():
print('=' * 60)
print('=== YOLO11人物検出プログラム ===')
print('=' * 60)
print('概要: CLAHEとTTAを適用し、リアルタイムで人物を検出します')
print('機能: YOLO11による人物検出(personクラスのみ)')
print('技術: CLAHE (コントラスト強化) + ByteTrack による追跡 + TTA (Test Time Augmentation)')
print('操作: qキーで終了')
print('出力: 各フレームごとに処理結果を表示し、終了時にresult.txtへ保存')
print()
# プログラムヘッダー表示
display_program_header()
print("\n=== YOLO11モデル選択 ===")
print('使用するYOLO11モデルを選択してください:')
for key, info in MODEL_INFO.items():
print(f'{key}: {info["name"]} ({info["params"]} params, mAP {info["mAP"]}) - {info["desc"]}')
print()
model_choice = ''
while model_choice not in MODEL_INFO.keys():
model_choice = input("選択 (n/s/m/l/x) [デフォルト: s]: ").strip().lower()
if model_choice == '':
model_choice = 's'
break
if model_choice not in MODEL_INFO.keys():
print("無効な選択です。もう一度入力してください。")
# モデルの初期化(集約された関数を使用)
print(f"\nYOLO11モデルをロード中...")
try:
model, model_name = initialize_model(model_choice)
print(f"\n検出対象: person(人物)のみ")
print(f"モデル情報: {MODEL_INFO[model_choice]['name']} ({MODEL_INFO[model_choice]['params']} params, mAP {MODEL_INFO[model_choice]['mAP']})")
print("モデルのロード完了")
except Exception as e:
print(f"モデルのロードに失敗しました: {e}")
raise SystemExit(1)
# ByteTrackとTTAの機能選択
print("\n=== ByteTrackとTTA機能の選択 ===")
print("1: ByteTrack, TTA (Test time augmentation) 無効化")
print("2: ByteTrack, TTA (Test time augmentation) 有効化")
print()
feature_choice = ''
while feature_choice not in ['1', '2']:
feature_choice = input("選択 (1/2) [デフォルト: 1]: ").strip()
if feature_choice == '':
feature_choice = '1'
break
if feature_choice not in ['1', '2']:
print("無効な選択です。もう一度入力してください。")
if feature_choice == '2':
TTA_ENABLED = True
USE_TRACKER = True
# ByteTrackトラッカーを初期化
tracker = ByteTrack()
print("\nByteTrackとTTAを有効化しました")
else:
TTA_ENABLED = False
USE_TRACKER = False
print("\nByteTrackとTTAを無効化しました")
# TTA設定の表示
if TTA_ENABLED:
print("\nTest Time Augmentation (TTA): 有効")
print(" - 水平反転による推論結果の統合")
print(f" - 信頼度ブースト値: {TTA_CONF_BOOST}")
print(f" - NMS閾値: {NMS_THRESHOLD}")
else:
print("\nTest Time Augmentation (TTA): 無効")
# ByteTrack設定の表示
if USE_TRACKER:
print("\nByteTrack: 有効")
print(" - カルマンフィルタによる動き予測")
print(" - IDごとに異なる色でバウンディングボックスを表示")
else:
print("\nByteTrack: 無効")
# 入力選択
print("\n=== YOLO11リアルタイム人物検出 ===")
print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")
choice = input("選択: ")
is_camera = (choice == '1')
if choice == '0':
# 動画ファイル → 通常のVideoCapture
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename()
if not path:
raise SystemExit(1)
cap = cv2.VideoCapture(path)
elif choice == '1':
# カメラ → ThreadedVideoCapture
cap = ThreadedVideoCapture(0, is_camera=True)
else:
# サンプル動画 → 通常のVideoCapture
SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
SAMPLE_FILE = 'vtest.avi'
print('サンプル動画をダウンロード中...')
urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
cap = cv2.VideoCapture(SAMPLE_FILE)
if not cap.isOpened():
print('動画ファイル・カメラを開けませんでした')
raise SystemExit(1)
# フレームレートの取得とタイムスタンプ増分の計算
if is_camera:
actual_fps = cap.get(cv2.CAP_PROP_FPS)
print(f'カメラのfps: {actual_fps}')
timestamp_increment = int(1000 / actual_fps) if actual_fps > 0 else 33
else:
video_fps = cap.get(cv2.CAP_PROP_FPS)
timestamp_increment = int(1000 / video_fps) if video_fps > 0 else 33
# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print(' q キー: プログラム終了')
frame_count = 0
results_log = []
start_time = time.time()
last_info_time = start_time
info_interval = 10.0 # 10秒ごとに表示
timestamp_ms = 0
total_processing_time = 0.0
try:
while True:
ret, frame = cap.read()
if not ret:
break
timestamp_ms += timestamp_increment
processing_start = time.time()
processed_frame, result, current_time = video_frame_processing(frame, timestamp_ms, is_camera)
processing_time = time.time() - processing_start
total_processing_time += processing_time
cv2.imshow(WINDOW_NAME, processed_frame)
if result:
if is_camera:
timestamp = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f'{timestamp}, {result}')
else:
print(f'Frame {frame_count}: {result}')
results_log.append(result)
# 情報提供(カメラモードのみ、info_interval秒ごと)
if is_camera:
elapsed = current_time - last_info_time
if elapsed >= info_interval:
total_elapsed = current_time - start_time
actual_fps = frame_count / total_elapsed if total_elapsed > 0 else 0
avg_processing_time = (total_processing_time / frame_count * 1000) if frame_count > 0 else 0
print(f'[情報] 経過時間: {total_elapsed:.1f}秒, 処理フレーム数: {frame_count}, 実測fps: {actual_fps:.1f}, 平均処理時間: {avg_processing_time:.1f}ms')
last_info_time = current_time
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
print('\n=== プログラム終了 ===')
cap.release()
cv2.destroyAllWindows()
if results_log:
with open('result.txt', 'w', encoding='utf-8') as f:
f.write('=== YOLO11人物検出結果 ===\n')
f.write(f'処理フレーム数: {frame_count}\n')
f.write(f'使用モデル: {model_name}\n')
f.write(f'モデル情報: {MODEL_INFO[model_choice]["name"]} ({MODEL_INFO[model_choice]["params"]} params, mAP {MODEL_INFO[model_choice]["mAP"]})\n')
f.write(f'使用デバイス: {str(device).upper()}\n')
if device.type == 'cuda':
f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
f.write(f'画像処理: CLAHE適用(YUV色空間)\n')
f.write(f'TTA (Test Time Augmentation): {"有効" if TTA_ENABLED else "無効"}\n')
if TTA_ENABLED:
f.write(f' - NMS閾値: {NMS_THRESHOLD}\n')
f.write(f' - 信頼度ブースト: {TTA_CONF_BOOST}\n')
f.write(f'ByteTrack: {"有効" if USE_TRACKER else "無効"}\n')
if USE_TRACKER:
f.write(f' - IDごとに異なる色で表示\n')
f.write(f'信頼度閾値: {CONF_THRESH}(固定値)\n')
if is_camera:
f.write('形式: タイムスタンプ, 検出結果\n')
else:
f.write('形式: フレーム番号, 検出結果\n')
f.write(f'\n検出対象: person(人物)\n')
f.write(f'検出回数: {person_count}回\n')
f.write('\n')
f.write('\n'.join(results_log))
print(f'\n処理結果をresult.txtに保存しました')
print(f'検出された人物数: {person_count}回')