RT-DETRv2 + DeepSORT による人物再識別(ソースコードと実行結果)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール(Windows 上) [クリックして展開]

以下のいずれかの方法で Python 3.12 をインストールする。Python がインストール済みの場合、この手順は不要である。

方法1:winget によるインストール

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

winget install --scope machine --id Python.Python.3.12 -e --silent --disable-interactivity --force --accept-source-agreements --accept-package-agreements --override "/quiet InstallAllUsers=1 PrependPath=1 Include_pip=1 Include_test=0 Include_launcher=1 InstallLauncherAllUsers=1"

--scope machine を指定することで、システム全体(全ユーザー向け)にインストールされる。このオプションの実行には管理者権限が必要である。インストール完了後、コマンドプロンプトを再起動すると PATH が自動的に設定される。

方法2:インストーラーによるインストール

  1. Python 公式サイト(https://www.python.org/downloads/)にアクセスし、「Download Python 3.x.x」ボタンから Windows 用インストーラーをダウンロードする。
  2. ダウンロードしたインストーラーを実行する。
  3. 初期画面の下部に表示される「Add python.exe to PATH」に必ずチェックを入れてから「Customize installation」を選択する。このチェックを入れ忘れると、コマンドプロンプトから python コマンドを実行できない。
  4. 「Install Python 3.xx for all users」にチェックを入れ、「Install」をクリックする。

インストールの確認

コマンドプロンプトで以下を実行する。

python --version

バージョン番号(例:Python 3.12.x)が表示されればインストール成功である。「'python' は、内部コマンドまたは外部コマンドとして認識されていません。」と表示される場合は、インストールが正常に完了していない。

AIエディタ Windsurf のインストール(Windows 上) [クリックして展開]

Pythonプログラムの編集・実行には、AIエディタの利用を推奨する。ここでは、Windsurfのインストールを説明する。Windsurf がインストール済みの場合、この手順は不要である。

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

winget install --scope machine --id Codeium.Windsurf -e --silent --disable-interactivity --force --accept-source-agreements --accept-package-agreements --custom "/SP- /SUPPRESSMSGBOXES /NORESTART /CLOSEAPPLICATIONS /DIR=""C:\Program Files\Windsurf"" /MERGETASKS=!runcode,addtopath,associatewithfiles,!desktopicon"
powershell -Command "$env:Path=[System.Environment]::GetEnvironmentVariable('Path','Machine')+';'+[System.Environment]::GetEnvironmentVariable('Path','User'); windsurf --install-extension MS-CEINTL.vscode-language-pack-ja --force; windsurf --install-extension ms-python.python --force; windsurf --install-extension Codeium.windsurfPyright --force"

--scope machine を指定することで、システム全体(全ユーザー向け)にインストールされる。このオプションの実行には管理者権限が必要である。インストール完了後、コマンドプロンプトを再起動すると PATH が自動的に設定される。

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリをシステム領域にインストール

管理者権限コマンドプロンプトで以下を実行する。管理者権限のコマンドプロンプトを起動するには、Windows キーまたはスタートメニューから「cmd」と入力し、表示された「コマンドプロンプト」を右クリックして「管理者として実行」を選択する。

pip

RT-DETRv2 + DeepSORT 人物追跡プログラム

ソースコード

# pip install torch torchvision torchaudio transformers opencv-python pillow numpy deep-sort-realtime
# RT-DETRv2 + DeepSORT 人物追跡プログラム
# 特徴技術および学習済モデルの利用制限: RT-DETRv2はApache 2.0ライセンス。DeepSORTはMITライセンス。必ず利用者自身で利用制限を確認すること。

import cv2
import torch
import numpy as np
from transformers import RTDetrV2ForObjectDetection, RTDetrImageProcessor
from PIL import Image, ImageDraw, ImageFont
import time
import urllib.request
from datetime import datetime
from deep_sort_realtime.deepsort_tracker import DeepSort
import tkinter as tk
from tkinter import filedialog

# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')
# GPU使用時の最適化
if device.type == 'cuda':
    torch.backends.cudnn.benchmark = True

# -----------------------------
# RT-DETRv2モデル設定
# -----------------------------
MODEL_NAME = 'PekingU/rtdetr_v2_r50vd'
CONF_THRESH = 0.5
PERSON_CLASS_ID = 0
PERSON_COLOR = (0, 255, 0)
FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'
FONT_SIZE = 20

model = RTDetrV2ForObjectDetection.from_pretrained(MODEL_NAME).to(device)
processor = RTDetrImageProcessor.from_pretrained(MODEL_NAME)
model.eval()
print("RT-DETRv2モデル初期化完了")

# -----------------------------
# DeepSORT設定(ReID内蔵)
# -----------------------------
deepsort = DeepSort(max_age=30, n_init=3)

# -----------------------------
# IDに基づく色生成関数
# -----------------------------
def get_color_by_id(track_id):
    """IDに基づいて一意の色を生成"""
    if isinstance(track_id, str):
        track_id = hash(track_id) % 2147483647
    elif not isinstance(track_id, int):
        track_id = int(track_id)

    np.random.seed(abs(track_id))
    color = np.random.randint(0, 255, 3)
    return (int(color[0]), int(color[1]), int(color[2]))

# -----------------------------
# ヘルパー関数
# -----------------------------
frame_count = 0
results_log = []
track_confidences = {}  # トラックIDと信頼度の対応を保持

def video_frame_processing(frame):
    global frame_count, track_confidences
    current_time = time.time()
    frame_count += 1

    # RT-DETRv2による人物検出
    frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    inputs = processor(images=frame_pil, return_tensors='pt')
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    target_sizes = torch.tensor([frame.shape[:2]]).to(device)
    results = processor.post_process_object_detection(
        outputs, target_sizes=target_sizes, threshold=CONF_THRESH
    )[0]

    boxes = []
    scores = []
    labels = []

    if len(results['labels']) > 0:
        boxes_all = results['boxes'].cpu().numpy()
        scores_all = results['scores'].cpu().numpy()
        labels_all = results['labels'].cpu().numpy()
        person_idx = labels_all == PERSON_CLASS_ID
        boxes = boxes_all[person_idx].astype(int)
        scores = scores_all[person_idx]
        labels = labels_all[person_idx]

    # -----------------------------
    # DeepSORTで追跡・ID付与
    # -----------------------------
    tracks = []
    if len(boxes) > 0:
        detections = []
        for i, (x1, y1, x2, y2) in enumerate(boxes):
            w = x2 - x1
            h = y2 - y1
            detections.append(([x1, y1, w, h], scores[i], str(labels[i])))

        outputs_tracks = deepsort.update_tracks(detections, frame=frame)
        for track in outputs_tracks:
            if not track.is_confirmed():
                continue
            tid = track.track_id
            tlbr = track.to_ltrb()

            # 最新の検出信頼度を取得、なければ保存済みの値を使用
            conf = track.get_det_conf()
            if conf is not None:
                track_confidences[tid] = conf
            else:
                conf = track_confidences.get(tid, 0.5)

            tracks.append({'id': tid, 'box': tlbr, 'conf': conf})

    # -----------------------------
    # フォント設定と描画
    # -----------------------------
    font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
    img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img_pil)

    # フレーム情報をPillowで描画
    draw.text((10, 30), f'Frame: {frame_count}', font=font, fill=(255, 255, 0))
    draw.text((10, 70), f'検出人数: {len(tracks)}', font=font, fill=(0, 255, 0))

    # ID情報とバウンディングボックスの描画
    for trk in tracks:
        x1, y1, x2, y2 = map(int, trk['box'])
        tid = trk['id']
        conf = trk['conf']

        # IDに応じた色を取得
        id_color = get_color_by_id(tid)

        # ID情報をテキストで表示(背景付き)
        label_text = f'ID:{tid} Conf:{conf:.2f}'
        bbox = draw.textbbox((x1+5, max(30, y1-10)), label_text, font=font)
        draw.rectangle([bbox[0]-5, bbox[1]-5, bbox[2]+5, bbox[3]+5], fill=id_color)
        draw.text((x1+5, max(30, y1-10)), label_text, font=font, fill=(255, 255, 255))

    frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

    # MBR(人物の範囲)をIDに応じた色で描画
    for trk in tracks:
        x1, y1, x2, y2 = map(int, trk['box'])
        id_color = get_color_by_id(trk['id'])
        cv2.rectangle(frame, (x1, y1), (x2, y2), id_color, 3)

    # -----------------------------
    # ログ出力
    # -----------------------------
    result = f'Frame {frame_count}: {len(tracks)}人検出'
    for trk in tracks:
        result += f' | ID:{trk["id"]} Conf:{trk["conf"]:.2f}'

    return frame, result, current_time

print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")

choice = input("選択: ")

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        exit()
    cap = cv2.VideoCapture(path)
elif choice == '1':
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    if not cap.isOpened():
        cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
else:
    # サンプル動画ダウンロード・処理
    SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
    SAMPLE_FILE = 'vtest.avi'
    urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
    cap = cv2.VideoCapture(SAMPLE_FILE)

if not cap.isOpened():
    print('動画ファイル・カメラを開けませんでした')
    exit()

# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print('  q キー: プログラム終了')
try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        MAIN_FUNC_DESC = "RT-DETRv2 + DeepSORT"
        processed_frame, result, current_time = video_frame_processing(frame)
        cv2.imshow(MAIN_FUNC_DESC, processed_frame)
        if choice == '1':  # カメラの場合
            print(datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], result)
        else:  # 動画ファイルの場合
            print(frame_count, result)
        results_log.append(result)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    print('\n=== プログラム終了 ===')
    cap.release()
    cv2.destroyAllWindows()
    if results_log:
        with open('result.txt', 'w', encoding='utf-8') as f:
            f.write('=== 結果 ===\n')
            f.write(f'処理フレーム数: {frame_count}\n')
            f.write(f'使用デバイス: {str(device).upper()}\n')
            if device.type == 'cuda':
                f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
            f.write('\n')
            f.write('\n'.join(results_log))
        print(f'\n処理結果をresult.txtに保存しました')