MediaPipeによる物体検出・ByteTrackによる追跡とTTAの機能付き(COCO 80クラス)(ソースコードと説明と利用ガイド)
プログラム利用ガイド
1. このプログラムの利用シーン
動画ファイルやカメラ映像から人物、車両、動物などの物体を自動検出するソフトウェアである。監視カメラの映像解析、交通量調査、動画コンテンツの自動タグ付けなどに活用できる。
2. 主な機能
- 物体検出: COCO 80クラス(人、車、動物、家具など)の物体を自動認識する
- 物体追跡: ByteTrackにより各物体にIDを付与し、フレーム間で追跡する
- モデル選択: 3種類のAIモデルから用途に応じて選択できる
- EfficientDet-Lite0: バランス型
- EfficientDet-Lite2: 高精度型
- SSD MobileNetV2: 軽量高速型
- 検出結果の保存: 処理結果をresult.txtファイルに記録する
3. 基本的な使い方
- 起動とモデル選択:
プログラムを起動後、1~3の数字でAIモデルを選択し、Enterキーを押す。
- 入力ソースの選択:
0(動画ファイル)、1(カメラ)、2(サンプル動画)から選択し、Enterキーを押す。
- 検出結果の確認:
画面に検出された物体がバウンディングボックスと日本語ラベルで表示される。
- 終了方法:
映像画面を選択した状態でqキーを押す。
4. 便利な機能
- TTA機能: Test Time Augmentationにより検出精度を向上させる
- 追跡ID表示: 各物体に固有のIDが付与され、移動を追跡できる
- リアルタイム処理: CPU上で動作し、特別なGPUを必要としない
- 検出ログ: タイムスタンプ付きで検出結果が記録される
事前準備
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なパッケージのインストール
管理者権限でコマンドプロンプトを起動し、以下のコマンドを実行する:
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install mediapipe opencv-python numpy pillow boxmot
MediaPipeオブジェクト検出プログラム(COCO 80クラス)・ByteTrackによる追跡とTTAの機能付き
概要
このプログラムは、MediaPipe Object Detectorを使用して動画や画像から物体を検出する。COCOデータセットの80クラスの物体を認識し、検出した物体をバウンディングボックスで表示する[1]。
主要技術
MediaPipe Object Detector
Googleが開発した機械学習ソリューションの物体検出コンポーネント[1]。TensorFlow Liteモデルを使用し、CPU上でリアルタイム処理を実現する。
ByteTrack
カルマンフィルタとハンガリアンアルゴリズムを組み合わせた多物体追跡手法[2]。検出ボックスの関連付けにより、フレーム間での物体の同一性を保持する。
技術的特徴
- Test Time Augmentation (TTA)
水平反転画像での推論結果を統合する。信頼度ブースト値0.03を適用する。
- モデル選択機能
EfficientDet-Lite0/Lite2[4]とSSD MobileNetV2[3]から選択可能。
- Non-Maximum Suppression (NMS)
IoU閾値0.6で重複検出を除去する。
- 信頼度フィルタリング
閾値0.5以上の検出のみを採用する。
実装の特色
- 検出物体の追跡ID付与による個体識別
- 画面表示は日本語、ログ出力は英語形式
- フレームごとの処理結果表示(count=N; class=name,ID=n,conf=0.xxx,box=[x1,y1,x2,y2]形式)
参考文献
[1] Google. (2023). MediaPipe Object Detection. https://developers.google.com/mediapipe/solutions/vision/object_detector
[2] Zhang, Y., et al. (2022). ByteTrack: Multi-Object Tracking by Associating Every Detection Box. ECCV 2022. https://arxiv.org/abs/2110.06864
[3] Howard, A., et al. (2017). MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications. https://arxiv.org/abs/1704.04861
[4] Tan, M., et al. (2020). EfficientDet: Scalable and Efficient Object Detection. CVPR 2020. https://arxiv.org/abs/1911.09070
ソースコード
# プログラム名: MediaPipeオブジェクト検出プログラム(COCO 80クラス)・ByteTrackによる追跡とTTAの機能付き
# 特徴技術名: MediaPipe
# 出典: MediaPipe Tasks - Google
# 特徴機能: MediaPipe Tasks Object Detectorによる2Dオブジェクト検出。リアルタイムで動作する軽量なオブジェクト検出
# 学習済みモデル: EfficientDet-Lite0/Lite2またはSSD MobileNetV2事前学習済みモデル(COCO 80クラス)- ユーザ選択式
# 方式設計:
# - 関連利用技術:
# - MediaPipe: Googleが開発したマルチプラットフォーム機械学習ソリューション
# - OpenCV: 画像処理、カメラ制御、描画処理、動画入出力管理
# - ByteTrack: カルマンフィルタとハンガリアンアルゴリズムによる物体追跡(boxmotパッケージ版)
# - TTA (Test Time Augmentation): 複数の画像変換で推論し結果を統合
# - 入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)、出力: OpenCV画面でリアルタイム表示(検出したオブジェクトをバウンディングボックスで日本語ラベル表示)、各フレームごとにprint()による処理結果表示(英語形式: count=N; class=name,ID=n,conf=0.xxx,box=[x1,y1,x2,y2])、プログラム終了時にresult.txtファイルに保存
# - 処理手順: 1.モデル選択、2.フレーム取得、3.MediaPipe推論実行、4.TTA適用、5.COCO 80クラスの検出、6.信頼度閾値による選別、7.ByteTrack追跡、8.バウンディングボックス描画
# - 前処理、後処理: 前処理:MediaPipe内部で自動実行。後処理:信頼度による閾値フィルタリングを実施、ByteTrack追跡による検出結果の安定化とID管理
# - 追加処理: 検出結果の信頼度降順ソートにより重要な検出を優先表示、TTA - 水平反転による推論結果の統合
# - 調整を必要とする設定値: CONF_THRESH(オブジェクト検出信頼度閾値、デフォルト0.5)- 値を上げると誤検出が減少するが検出漏れが増加、TTA_ENABLED(TTAの有効/無効、デフォルトTrue)
# 将来方策: CONF_THRESHの動的調整機能。フレーム毎の検出数を監視し、検出数が閾値を超えた場合は信頼度を上げ、検出数が少ない場合は下げる適応的制御の実装
# 特徴技術および学習済モデルの利用制限: MediaPipe(Apache License 2.0)、boxmot(AGPL-3.0)、学習済みモデル(Apache License 2.0)。商用利用時は各ライセンスを確認すること。必ず利用者自身で利用制限を確認すること。
# その他の重要事項: Windows環境専用設計、初回実行時は学習済みモデルの自動ダウンロード
# 前準備:
# - 以下のコマンドを実行してください:
# pip install mediapipe opencv-python numpy pillow boxmot
import cv2
import tkinter as tk
from tkinter import filedialog
import os
import numpy as np
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import warnings
import time
import urllib.request
from PIL import Image, ImageDraw, ImageFont
from datetime import datetime
from boxmot import ByteTrack
import threading
warnings.filterwarnings('ignore')
# ===== 設定・定数管理 =====
# プログラム概要表示
print('=== MediaPipeオブジェクト検出プログラム ===')
print('概要: リアルタイムでオブジェクトを検出し、バウンディングボックスで表示します')
print('機能: MediaPipe Tasks Object Detectorによるオブジェクト検出(COCOデータセット80クラス)')
print('技術: TTA (Test Time Augmentation) + ByteTrack による追跡')
print('操作: qキーで終了')
print('出力: 各フレームごとの処理結果表示、終了時にresult.txt保存')
print()
# モデル選択メニュー
print('=== AIモデル選択 ===')
print('1: EfficientDet-Lite0 - COCOデータセット80クラス学習済み、バランス型')
print('2: EfficientDet-Lite2 - COCOデータセット80クラス学習済み、高精度型')
print('3: SSD MobileNetV2 - COCOデータセット80クラス学習済み、軽量型')
model_choice = input('選択 (1-3): ').strip()
# モデル選択の処理(デフォルトは1)
if model_choice == '2':
MODEL_SIZE = '2'
elif model_choice == '3':
MODEL_SIZE = 'ssd'
else:
MODEL_SIZE = '0' # デフォルト
if model_choice not in ['1', '']:
print('無効な選択です。デフォルト(EfficientDet-Lite0)を使用します。')
print()
# モデル情報
MODEL_INFO = {
'0': {
'name': 'EfficientDet-Lite0',
'desc': 'バランス型(推奨)',
'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite',
'file': 'efficientdet_lite0.tflite'
},
'2': {
'name': 'EfficientDet-Lite2',
'desc': '高精度型',
'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite2/int8/1/efficientdet_lite2.tflite',
'file': 'efficientdet_lite2.tflite'
},
'ssd': {
'name': 'SSD MobileNetV2',
'desc': '軽量型',
'url': 'https://storage.googleapis.com/mediapipe-models/object_detector/ssd_mobilenet_v2/float32/1/ssd_mobilenet_v2.tflite',
'file': 'ssd_mobilenet_v2.tflite'
}
}
MODEL_URL = MODEL_INFO[MODEL_SIZE]['url']
MODEL_PATH = MODEL_INFO[MODEL_SIZE]['file']
# COCO 80クラス名
COCO_CLASSES = [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
'hair drier', 'toothbrush'
]
# 日本語クラス名マッピング(OpenCV表示用)
CLASS_NAMES_JP = {
'person': '人', 'bicycle': '自転車', 'car': '車', 'motorcycle': 'バイク',
'airplane': '飛行機', 'bus': 'バス', 'train': '電車', 'truck': 'トラック',
'boat': 'ボート', 'traffic light': '信号機', 'fire hydrant': '消火栓',
'stop sign': '停止標識', 'parking meter': 'パーキングメーター', 'bench': 'ベンチ',
'bird': '鳥', 'cat': '猫', 'dog': '犬', 'horse': '馬', 'sheep': '羊',
'cow': '牛', 'elephant': '象', 'bear': '熊', 'zebra': 'シマウマ', 'giraffe': 'キリン',
'backpack': 'リュック', 'umbrella': '傘', 'handbag': 'ハンドバッグ', 'tie': 'ネクタイ',
'suitcase': 'スーツケース', 'frisbee': 'フリスビー', 'skis': 'スキー板',
'snowboard': 'スノーボード', 'sports ball': 'ボール', 'kite': '凧',
'baseball bat': 'バット', 'baseball glove': 'グローブ', 'skateboard': 'スケートボード',
'surfboard': 'サーフボード', 'tennis racket': 'テニスラケット', 'bottle': 'ボトル',
'wine glass': 'ワイングラス', 'cup': 'カップ', 'fork': 'フォーク', 'knife': 'ナイフ',
'spoon': 'スプーン', 'bowl': 'ボウル', 'banana': 'バナナ', 'apple': 'リンゴ',
'sandwich': 'サンドイッチ', 'orange': 'オレンジ', 'broccoli': 'ブロッコリー',
'carrot': 'ニンジン', 'hot dog': 'ホットドッグ', 'pizza': 'ピザ', 'donut': 'ドーナツ',
'cake': 'ケーキ', 'chair': '椅子', 'couch': 'ソファ', 'potted plant': '鉢植え',
'bed': 'ベッド', 'dining table': 'テーブル', 'toilet': 'トイレ', 'tv': 'テレビ',
'laptop': 'ノートPC', 'mouse': 'マウス', 'remote': 'リモコン', 'keyboard': 'キーボード',
'cell phone': '携帯電話', 'microwave': '電子レンジ', 'oven': 'オーブン',
'toaster': 'トースター', 'sink': 'シンク', 'refrigerator': '冷蔵庫',
'book': '本', 'clock': '時計', 'vase': '花瓶', 'scissors': 'ハサミ',
'teddy bear': 'ぬいぐるみ', 'hair drier': 'ドライヤー', 'toothbrush': '歯ブラシ'
}
# クラスごとの色生成(HSVからBGRに変換)
def generate_colors(num_classes):
colors = []
for i in range(num_classes):
hue = int(180.0 * i / num_classes)
hsv = np.uint8([[[hue, 255, 255]]])
bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0][0]
colors.append((int(bgr[0]), int(bgr[1]), int(bgr[2])))
return colors
CLASS_COLORS = generate_colors(len(COCO_CLASSES))
RESULT_FILE = 'result.txt'
# カメラ設定
WINDOW_WIDTH = 1280 # カメラ解像度幅
WINDOW_HEIGHT = 720 # カメラ解像度高さ
# 検出パラメータ(調整可能)
CONF_THRESH = 0.5 # オブジェクト検出信頼度閾値(0.0-1.0)
TTA_ENABLED = True # TTA(Test Time Augmentation)の有効/無効
TTA_CONF_BOOST = 0.03 # TTA使用時の信頼度ブースト値
NMS_THRESHOLD = 0.6 # TTA用のNMS閾値(独立管理)
USE_TRACKER = True # トラッカーの使用有無
# テキスト描画設定(Pillow)
FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'
FONT_SIZE_LABEL = 18
FONT_SIZE_INFO = 18
FONT_LABEL = ImageFont.truetype(FONT_PATH, FONT_SIZE_LABEL)
FONT_INFO = ImageFont.truetype(FONT_PATH, FONT_SIZE_INFO)
# ウィンドウ名
WINDOW_NAME = 'MediaPipe Object Detection'
# システム初期化
print('システム初期化中...')
# モデルダウンロード
if not os.path.exists(MODEL_PATH):
print(f'{MODEL_INFO[MODEL_SIZE]["name"]}モデルをダウンロード中...')
try:
urllib.request.urlretrieve(MODEL_URL, MODEL_PATH)
print('モデルのダウンロードが完了しました')
except Exception as e:
print(f'モデルのダウンロードに失敗しました: {e}')
exit()
# MediaPipeモデル初期化
detector = None
try:
print(f'MediaPipe Object Detector {MODEL_INFO[MODEL_SIZE]["name"]}モデルを初期化中...')
base_options = python.BaseOptions(model_asset_path=MODEL_PATH)
options = vision.ObjectDetectorOptions(
base_options=base_options,
running_mode=vision.RunningMode.IMAGE,
score_threshold=CONF_THRESH
)
detector = vision.ObjectDetector.create_from_options(options)
print(f'MediaPipe Object Detector {MODEL_INFO[MODEL_SIZE]["name"]}モデルの初期化が完了しました')
print(f'モデル: {MODEL_INFO[MODEL_SIZE]["name"]} ({MODEL_INFO[MODEL_SIZE]["desc"]})')
print('検出可能クラス: COCOデータセット80クラス')
except Exception as e:
print('MediaPipe Object Detectorモデルの初期化に失敗しました')
print(f'エラー: {e}')
exit()
print('CPUモード')
# ByteTrackトラッカーを初期化
tracker = ByteTrack() if USE_TRACKER else None
# TTA設定の表示
if TTA_ENABLED:
print("\nTest Time Augmentation (TTA): 有効")
print(" - 水平反転による推論結果の統合")
print(f" - 信頼度ブースト値: {TTA_CONF_BOOST}")
print(f" - NMS閾値: {NMS_THRESHOLD}")
else:
print("\nTest Time Augmentation (TTA): 無効")
# ByteTrack設定の表示
if USE_TRACKER:
print("\nByteTrack: 有効")
print(" - カルマンフィルタによる動き予測")
print('初期化完了')
print()
# グローバル変数
frame_count = 0
results_log = []
class ThreadedVideoCapture:
"""スレッド化されたVideoCapture(常に最新フレームを取得)"""
def __init__(self, src, is_camera=False):
if is_camera:
self.cap = cv2.VideoCapture(src, cv2.CAP_DSHOW)
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
self.cap.set(cv2.CAP_PROP_FOURCC, fourcc)
self.cap.set(cv2.CAP_PROP_FPS, 60)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, WINDOW_WIDTH)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, WINDOW_HEIGHT)
else:
self.cap = cv2.VideoCapture(src)
self.grabbed, self.frame = self.cap.read()
self.stopped = False
self.lock = threading.Lock()
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
def update(self):
"""バックグラウンドでフレームを取得し続ける"""
while not self.stopped:
grabbed, frame = self.cap.read()
with self.lock:
self.grabbed = grabbed
if grabbed:
self.frame = frame
def read(self):
"""最新フレームを返す"""
with self.lock:
return self.grabbed, self.frame.copy() if self.grabbed else None
def isOpened(self):
return self.cap.isOpened()
def get(self, prop):
return self.cap.get(prop)
def release(self):
self.stopped = True
self.thread.join()
self.cap.release()
def nms(boxes, scores, iou_threshold):
"""Non-Maximum Suppression"""
if len(boxes) == 0:
return []
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= iou_threshold)[0]
order = order[inds + 1]
return keep
def apply_tta_inference(frame, frame_count):
"""Test Time Augmentation (TTA)を適用した推論"""
h, w = frame.shape[:2]
# RGB変換(MediaPipeはRGBを期待)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 水平反転画像を作成
flipped_frame = cv2.flip(rgb_frame, 1)
# MediaPipe Image作成
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
mp_image_flipped = mp.Image(image_format=mp.ImageFormat.SRGB, data=flipped_frame)
# 検出実行(IMAGEモード)
detection_result = detector.detect(mp_image)
detection_result_flipped = detector.detect(mp_image_flipped)
all_boxes = []
all_scores = []
all_class_ids = []
all_class_names = []
# 元画像の結果を取得
if detection_result.detections:
for detection in detection_result.detections:
bbox = detection.bounding_box
x1 = int(bbox.origin_x)
y1 = int(bbox.origin_y)
x2 = int(bbox.origin_x + bbox.width)
y2 = int(bbox.origin_y + bbox.height)
x1 = max(0, min(w, x1))
y1 = max(0, min(h, y1))
x2 = max(0, min(w, x2))
y2 = max(0, min(h, y2))
category = detection.categories[0]
class_id = category.index if category.index is not None else 0
class_name = category.category_name if category.category_name else 'Unknown'
score = float(category.score)
all_boxes.append([x1, y1, x2, y2])
all_scores.append(score)
all_class_ids.append(class_id)
all_class_names.append(class_name)
# 反転画像の結果を取得し、座標を元に戻す
if detection_result_flipped.detections:
for detection in detection_result_flipped.detections:
bbox = detection.bounding_box
x1_flip = int(bbox.origin_x)
y1_flip = int(bbox.origin_y)
x2_flip = int(bbox.origin_x + bbox.width)
y2_flip = int(bbox.origin_y + bbox.height)
# 水平反転の座標を正しく元に戻す(x1 < x2の関係を保つ)
x1 = w - 1 - x2_flip
x2 = w - 1 - x1_flip
y1 = y1_flip
y2 = y2_flip
x1 = max(0, min(w, x1))
y1 = max(0, min(h, y1))
x2 = max(0, min(w, x2))
y2 = max(0, min(h, y2))
category = detection.categories[0]
class_id = category.index if category.index is not None else 0
class_name = category.category_name if category.category_name else 'Unknown'
score = float(category.score)
all_boxes.append([x1, y1, x2, y2])
all_scores.append(score)
all_class_ids.append(class_id)
all_class_names.append(class_name)
# 結果が空の場合は空リストを返す
if len(all_boxes) == 0:
return []
# numpy配列に変換
all_boxes = np.array(all_boxes)
all_scores = np.array(all_scores)
all_class_ids = np.array(all_class_ids)
# 信頼度閾値でフィルタリング(NMS前に実施)
valid_indices = all_scores > CONF_THRESH
if valid_indices.sum() > 0:
all_boxes = all_boxes[valid_indices]
all_scores = all_scores[valid_indices]
all_class_ids = all_class_ids[valid_indices]
all_class_names = [all_class_names[i] for i in range(len(all_class_names)) if valid_indices[i]]
# NMSを使用
nms_indices = nms(all_boxes, all_scores, NMS_THRESHOLD)
# 結果をリスト形式に変換
objects = []
for i in nms_indices:
# TTAで検出された場合、信頼度をブースト
conf_boost = TTA_CONF_BOOST if TTA_ENABLED else 0
objects.append({
'box': (int(all_boxes[i][0]), int(all_boxes[i][1]), int(all_boxes[i][2]), int(all_boxes[i][3])),
'detection_conf': min(1.0, all_scores[i] + conf_boost),
'class_id': int(all_class_ids[i]),
'class_name': all_class_names[i]
})
return objects
return []
def normal_inference(frame, frame_count):
"""通常の推論処理"""
h, w = frame.shape[:2]
# RGB変換(MediaPipeはRGBを期待)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# MediaPipe Image作成
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
# 検出実行
detection_result = detector.detect(mp_image)
objects = []
if detection_result.detections:
for detection in detection_result.detections:
bbox = detection.bounding_box
x1 = int(bbox.origin_x)
y1 = int(bbox.origin_y)
x2 = int(bbox.origin_x + bbox.width)
y2 = int(bbox.origin_y + bbox.height)
x1 = max(0, min(w, x1))
y1 = max(0, min(h, y1))
x2 = max(0, min(w, x2))
y2 = max(0, min(h, y2))
category = detection.categories[0]
class_id = category.index if category.index is not None else 0
class_name = category.category_name if category.category_name else 'Unknown'
score = float(category.score)
objects.append({
'box': (x1, y1, x2, y2),
'detection_conf': score,
'class_id': class_id,
'class_name': class_name
})
return objects
def apply_tta_if_enabled(frame, frame_count):
"""TTA機能を条件付きで適用"""
if not TTA_ENABLED:
return normal_inference(frame, frame_count)
return apply_tta_inference(frame, frame_count)
def apply_bytetrack(objects, frame):
"""ByteTrackerを使用したトラッキング処理"""
global tracker
if len(objects) > 0:
dets_array = np.array([[obj['box'][0], obj['box'][1], obj['box'][2], obj['box'][3],
obj['detection_conf'], obj['class_id']] for obj in objects])
else:
dets_array = np.empty((0, 6))
tracks = tracker.update(dets_array, frame)
tracked_objects = []
if len(tracks) > 0:
for track in tracks:
if len(track) >= 7:
x1, y1, x2, y2, track_id, conf, cls = track[:7]
class_name = COCO_CLASSES[int(cls)] if int(cls) < len(COCO_CLASSES) else 'Unknown'
tracked_objects.append({
'box': (int(x1), int(y1), int(x2), int(y2)),
'track_id': int(track_id),
'detection_conf': float(conf),
'class_id': int(cls),
'class_name': class_name
})
return tracked_objects
def apply_tracking_if_enabled(objects, frame):
"""トラッキング機能を条件付きで適用"""
if not USE_TRACKER:
return objects
return apply_bytetrack(objects, frame)
def format_output_string(objects):
"""出力文字列を指定形式でフォーマット"""
if len(objects) == 0:
return "count=0"
result_parts = [f"count={len(objects)}"]
for obj in objects:
x1, y1, x2, y2 = obj['box']
class_name = obj['class_name']
conf = obj['detection_conf']
if USE_TRACKER and 'track_id' in obj:
obj_str = f"class={class_name},ID={obj['track_id']},conf={conf:.3f},box=[{x1},{y1},{x2},{y2}]"
else:
obj_str = f"class={class_name},conf={conf:.3f},box=[{x1},{y1},{x2},{y2}]"
result_parts.append(obj_str)
return "; ".join(result_parts[:1]) + (" " if len(result_parts) > 1 else "") + " | ".join(result_parts[1:])
def video_frame_processing(frame, timestamp_ms, is_camera):
"""フレーム処理メイン関数"""
global frame_count
current_time = time.time()
frame_count += 1
# 推論実行
objects = apply_tta_if_enabled(frame, frame_count)
# トラッキングを条件付きで適用
objects = apply_tracking_if_enabled(objects, frame)
# 信頼度でソート(降順)
objects.sort(key=lambda x: x['detection_conf'], reverse=True)
# 出力文字列をフォーマット
output_result = format_output_string(objects)
# 描画処理(矩形はOpenCV、テキストはPillow)
for obj in objects:
x1, y1, x2, y2 = obj['box']
class_id = obj['class_id']
color_bgr = CLASS_COLORS[class_id % len(CLASS_COLORS)] if class_id < len(CLASS_COLORS) else CLASS_COLORS[0]
cv2.rectangle(frame, (x1, y1), (x2, y2), color_bgr, 2)
# テキスト描画(Pillow、日本語)
img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
for obj in objects:
x1, y1, x2, y2 = obj['box']
class_name = obj['class_name']
jp_name = CLASS_NAMES_JP.get(class_name, class_name)
conf_txt = f'Conf:{obj["detection_conf"]:.1%}'
color_bgr = CLASS_COLORS[obj['class_id'] % len(CLASS_COLORS)] if obj['class_id'] < len(CLASS_COLORS) else CLASS_COLORS[0]
color_rgb = (color_bgr[2], color_bgr[1], color_bgr[0])
if USE_TRACKER and 'track_id' in obj:
label_txt = f"ID:{obj['track_id']} {jp_name}"
else:
label_txt = jp_name
h = frame.shape[0]
draw.text((x1, max(0, y1 - 22)), label_txt, font=FONT_LABEL, fill=color_rgb)
draw.text((x1, min(y2 + 2, h - 18)), conf_txt, font=FONT_LABEL, fill=(255, 255, 255))
tta_status = "TTA:ON" if TTA_ENABLED else "TTA:OFF"
tracker_status = "ByteTrack:ON" if USE_TRACKER else "ByteTrack:OFF"
info1 = f'MediaPipe (CPU) | Frame: {frame_count} | Objects: {len(objects)} | {tta_status} | {tracker_status}'
info2 = '操作: qで終了'
draw.text((10, 10), info1, font=FONT_INFO, fill=(255, 255, 255))
draw.text((10, 36), info2, font=FONT_INFO, fill=(255, 255, 0))
frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return frame, output_result, current_time
# 入力選択
print('0: 動画ファイル')
print('1: カメラ')
print('2: サンプル動画')
choice = input('選択: ')
is_camera = (choice == '1')
if choice == '0':
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename()
if not path:
exit()
cap = cv2.VideoCapture(path)
elif choice == '1':
cap = ThreadedVideoCapture(0, is_camera=True)
else:
SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
SAMPLE_FILE = 'vtest.avi'
print('サンプル動画をダウンロード中...')
urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
cap = cv2.VideoCapture(SAMPLE_FILE)
if not cap.isOpened():
print('動画ファイル・カメラを開けませんでした')
exit()
# フレームレート取得とタイムスタンプ増分の計算
if is_camera:
actual_fps = cap.get(cv2.CAP_PROP_FPS)
print(f'カメラのfps: {actual_fps}')
timestamp_increment = int(1000 / actual_fps) if actual_fps > 0 else 33
else:
video_fps = cap.get(cv2.CAP_PROP_FPS)
timestamp_increment = int(1000 / video_fps) if video_fps > 0 else 33
# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print(' q キー: プログラム終了')
start_time = time.time()
last_info_time = start_time
info_interval = 10.0
timestamp_ms = 0
total_processing_time = 0.0
try:
while True:
ret, frame = cap.read()
if not ret:
break
timestamp_ms += timestamp_increment
processing_start = time.time()
processed_frame, result, current_time = video_frame_processing(frame, timestamp_ms, is_camera)
processing_time = time.time() - processing_start
total_processing_time += processing_time
cv2.imshow(WINDOW_NAME, processed_frame)
if result:
if is_camera:
timestamp = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
output_line = f"{timestamp}, {result}"
else:
output_line = f"Frame {frame_count}: {result}"
print(output_line)
results_log.append(output_line)
# 情報提供(カメラモードのみ、info_interval秒ごと)
if is_camera:
elapsed = current_time - last_info_time
if elapsed >= info_interval:
total_elapsed = current_time - start_time
actual_fps = frame_count / total_elapsed if total_elapsed > 0 else 0
avg_processing_time = (total_processing_time / frame_count * 1000) if frame_count > 0 else 0
print(f'[情報] 経過時間: {total_elapsed:.1f}秒, 処理フレーム数: {frame_count}, 実測fps: {actual_fps:.1f}, 平均処理時間: {avg_processing_time:.1f}ms')
last_info_time = current_time
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
print('\n=== プログラム終了 ===')
cap.release()
cv2.destroyAllWindows()
if results_log:
with open('result.txt', 'w', encoding='utf-8') as f:
f.write('=== 結果 ===\n')
f.write(f'処理フレーム数: {frame_count}\n')
f.write(f'使用モデル: {MODEL_INFO[MODEL_SIZE]["name"]}\n')
f.write(f'使用デバイス: CPU\n')
f.write(f'動作モード: IMAGE\n')
f.write(f'TTA (Test Time Augmentation): {"有効" if TTA_ENABLED else "無効"}\n')
if TTA_ENABLED:
f.write(f' - NMS閾値: {NMS_THRESHOLD}\n')
f.write(f' - 信頼度ブースト: {TTA_CONF_BOOST}\n')
f.write(f'ByteTrack: {"有効" if USE_TRACKER else "無効"}\n')
f.write('\n')
f.write('\n'.join(results_log))
print('処理結果をresult.txtに保存しました')