DPT による床面段差検出(ソースコードと実行結果)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers opencv-python numpy pillow scipy scikit-learn

床面段差検出プログラム

概要

このプログラムは、カメラや動画から視覚情報を取得し、深度推定技術により床面の段差を検出。 視覚情報から3次元的な深度情報を推定し、床面の段差や凹凸を検出する。DPT(Dense Prediction Transformer)を用いて2次元画像から深度マップを生成し、RANSAC平面推定により床面を特定した後、その平面からの距離を計算することで段差を検出する。これにより、人間が目で見て判断するような危険箇所の認識を行う。

主要技術

参考文献

[1] Ranftl, R., Bochkovskiy, A., & Koltun, V. (2021). Vision transformers for dense prediction. In Proceedings of the IEEE/CVF International Conference on Computer Vision (pp. 12179-12188).

[2] Fischler, M. A., & Bolles, R. C. (1981). Random sample consensus: a paradigm for model fitting with applications to image analysis and automated cartography. Communications of the ACM, 24(6), 381-395.

ソースコード


# プログラム名: 床面段差検出プログラム - DPT深度推定による危険箇所可視化
# 特徴技術名: DPT (Dense Prediction Transformer) + CC-RANSAC
# 出典: Ranftl, R., et al. (2021). Vision Transformers for Dense Prediction. ICCV 2021.
# 特徴機能: Vision Transformerベースアーキテクチャによる高解像度単眼深度推定 + 連結成分考慮のRANSAC平面検出
# 学習済みモデル: DPT-Large、高精度な深度推定が可能なTransformerベースモデル、URL: https://huggingface.co/Intel/dpt-large
# 方式設計:
#   - 関連利用技術: OpenCV (動画処理), NumPy (数値計算), Transformers (モデル読み込み), Tkinter (GUI), scikit-learn (RANSAC)
#   - 入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択)、出力: 処理結果が画像化されOpenCV画面でリアルタイム表示
#   - 処理手順: RGB画像入力→DPTによる深度推定→3D点群生成→CC-RANSAC平面推定→適応的閾値による段差検出→危険領域の可視化
#   - 前処理、後処理: 前処理: DPTImageProcessorによる自動前処理、後処理: エッジ保存フィルタ、適応的深度正規化
#   - 追加処理: 深度ヒストグラムベースの動的閾値設定、連結成分を考慮したRANSAC、ロバストな時間的平滑化
# 修正内容:
#   - DPTの深度値の正しい解釈(逆深度を深度に変換)
#   - 床面検出の改善(最大の平面領域を床面として検出)
#   - 閾値の自動調整機能
# 前準備:
#   - pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
#   - pip install transformers opencv-python numpy pillow scipy scikit-learn opencv-contrib-python

import cv2
import numpy as np
import torch
import tkinter as tk
from tkinter import filedialog
from scipy.ndimage import gaussian_filter, median_filter
from scipy.spatial.distance import cdist
from sklearn.cluster import DBSCAN
from PIL import Image
from transformers import DPTImageProcessor, DPTForDepthEstimation
import time
import os
import warnings
warnings.filterwarnings('ignore')

# プログラムの概要表示
print('=== 床面段差検出プログラム(修正版) ===')
print('DPT深度推定とCC-RANSACを使用して床面の段差や危険箇所を高精度に検出します')
print("操作方法: 'q'キーで終了, 'd'キーで深度マップ表示切替, 't'キーで閾値調整")
print('')

# 調整可能な設定値
DISTANCE_THRESHOLD = 0.03    # 平面からの距離閾値(正規化された単位)
DANGER_AREA_SIZE = 500       # 危険領域として表示する最小面積(ピクセル)
RANSAC_ITERATIONS = 100      # RANSAC反復回数
RANSAC_SAMPLE_RATIO = 0.3    # RANSACサンプリング比率
TEMPORAL_SMOOTHING = 0.8     # 時間的平滑化の重み
CONNECTIVITY_RADIUS = 30     # 連結成分の判定半径(ピクセル)
FLOOR_PERCENTILE_LOW = 0.6   # 床面検出の下限パーセンタイル
FLOOR_PERCENTILE_HIGH = 0.95 # 床面検出の上限パーセンタイル

# フィルタリング設定
BILATERAL_D = 5              # バイラテラルフィルタの直径
BILATERAL_SIGMA_COLOR = 50   # 色空間のシグマ
BILATERAL_SIGMA_SPACE = 50   # 座標空間のシグマ
MEDIAN_KERNEL_SIZE = 3       # 中央値フィルタのカーネルサイズ

# デバッグ用フラグ
SHOW_DEPTH_MAP = False       # 深度マップの表示切替

# 乱数シード(再現性のため)
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

# DPTモデルを使用
print('深度推定モデルを読み込み中...')
processor = DPTImageProcessor.from_pretrained('Intel/dpt-large')
model = DPTForDepthEstimation.from_pretrained(
    'Intel/dpt-large',
    torch_dtype=torch_dtype
).to(device)
model.eval()
print('モデルの読み込みが完了しました')
print('')

# 結果記録用
results_log = []
last_print_time = time.time()
frame_count = 0
prev_plane_params = None
depth_history = []
distance_threshold_current = DISTANCE_THRESHOLD


def simple_plane_ransac(points_3d, max_iterations=100, threshold=0.02):
    """シンプルなRANSAC平面検出(デバッグ用)"""
    best_inliers = []
    best_plane = None
    n_points = len(points_3d)

    if n_points < 100:
        return None, []

    for _ in range(max_iterations):
        # ランダムに3点選択
        sample_idx = np.random.choice(n_points, 3, replace=False)
        p1, p2, p3 = points_3d[sample_idx]

        # 平面の法線ベクトル計算
        v1 = p2 - p1
        v2 = p3 - p1
        normal = np.cross(v1, v2)

        if np.linalg.norm(normal) < 1e-6:
            continue

        normal = normal / np.linalg.norm(normal)
        d = -np.dot(normal, p1)

        # 距離計算
        distances = np.abs(np.dot(points_3d, normal) + d)
        inliers = np.where(distances < threshold)[0]

        # 最も多くのインライアを持つ平面を選択
        if len(inliers) > len(best_inliers):
            best_inliers = inliers
            best_plane = (normal, d)

    return best_plane, best_inliers


def detect_floor_plane(depth_map, original_width, original_height):
    """床面平面を検出(改善版)"""
    # 深度値の統計情報
    valid_mask = depth_map > 0
    if not np.any(valid_mask):
        return None, np.array([])

    # 画像の下半分を重点的にサンプリング(床面は通常下側にある)
    lower_half_start = original_height // 2
    lower_mask = np.zeros_like(depth_map, dtype=bool)
    lower_mask[lower_half_start:, :] = True

    # 有効かつ下半分の領域
    floor_candidate_mask = valid_mask & lower_mask

    # 深度値のパーセンタイルで床面候補を絞り込む
    valid_depths = depth_map[floor_candidate_mask]
    if len(valid_depths) < 100:
        return None, np.array([])

    # 床面は通常、ある程度の深度範囲に集中している
    depth_low = np.percentile(valid_depths, FLOOR_PERCENTILE_LOW * 100)
    depth_high = np.percentile(valid_depths, FLOOR_PERCENTILE_HIGH * 100)

    # 深度範囲内のピクセルを床面候補とする
    floor_candidate_mask = floor_candidate_mask & (depth_map >= depth_low) & (depth_map <= depth_high)

    # 候補点の3D座標を生成
    y_coords, x_coords = np.where(floor_candidate_mask)
    z_values = depth_map[floor_candidate_mask]

    # 正規化された3D座標(簡易版)
    points_3d = np.column_stack([
        x_coords / original_width,
        y_coords / original_height,
        z_values
    ])

    # サンプリング
    n_points = len(points_3d)
    if n_points > 10000:
        sample_idx = np.random.choice(n_points, 10000, replace=False)
        points_3d_sample = points_3d[sample_idx]
    else:
        points_3d_sample = points_3d

    # RANSAC平面検出
    plane_params, inlier_indices = simple_plane_ransac(
        points_3d_sample,
        max_iterations=RANSAC_ITERATIONS,
        threshold=0.01  # より厳しい閾値
    )

    if plane_params is None:
        return None, np.array([])

    # 全床面候補点に対してインライアを再計算
    normal, d = plane_params
    distances = np.abs(np.dot(points_3d, normal) + d)
    floor_inliers = np.where(distances < 0.02)[0]

    # インライアの画像座標
    floor_pixel_indices = np.zeros(len(floor_inliers), dtype=int)
    for i, idx in enumerate(floor_inliers):
        y = y_coords[idx]
        x = x_coords[idx]
        floor_pixel_indices[i] = y * original_width + x

    return plane_params, floor_pixel_indices


def video_processing(frame):
    global frame_count, last_print_time, results_log, prev_plane_params, SHOW_DEPTH_MAP, distance_threshold_current

    frame_count += 1
    current_time = time.time()

    # 元のフレームサイズを保存
    original_height, original_width = frame.shape[:2]

    # BGR to RGB変換
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # PIL Imageに変換
    pil_image = Image.fromarray(rgb_frame)

    # 深度推定の実行
    inputs = processor(images=pil_image, return_tensors='pt')
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        depth = outputs.predicted_depth

    # 深度マップをCPUに移動し、NumPy配列に変換
    depth_map = depth.squeeze().cpu().numpy()

    # 深度マップを元のサイズにリサイズ
    depth_map = cv2.resize(
        depth_map,
        (original_width, original_height),
        interpolation=cv2.INTER_LINEAR
    )

    # DPTは逆深度を出力することがあるので、値を反転
    # (遠いものほど小さい値になるように)
    depth_map = -depth_map

    # 正規化(0-1の範囲)
    depth_min = np.min(depth_map)
    depth_max = np.max(depth_map)
    if depth_max - depth_min > 1e-6:
        depth_map = (depth_map - depth_min) / (depth_max - depth_min)
    else:
        depth_map = np.ones_like(depth_map) * 0.5

    # ノイズ除去
    depth_map = median_filter(depth_map, size=MEDIAN_KERNEL_SIZE)
    depth_map = cv2.bilateralFilter(
        depth_map.astype(np.float32),
        BILATERAL_D,
        BILATERAL_SIGMA_COLOR,
        BILATERAL_SIGMA_SPACE
    )

    # 床面平面の検出
    plane_params, floor_inliers = detect_floor_plane(
        depth_map, original_width, original_height
    )

    # 危険領域の検出
    danger_mask = np.zeros((original_height, original_width), dtype=np.uint8)

    if plane_params is not None:
        normal, d = plane_params

        # 時間的平滑化
        if prev_plane_params is not None:
            prev_normal, prev_d = prev_plane_params
            # 法線ベクトルの向きを揃える
            if np.dot(normal, prev_normal) < 0:
                normal = -normal
                d = -d
            smoothed_normal = TEMPORAL_SMOOTHING * prev_normal + (1 - TEMPORAL_SMOOTHING) * normal
            smoothed_normal = smoothed_normal / np.linalg.norm(smoothed_normal)
            smoothed_d = TEMPORAL_SMOOTHING * prev_d + (1 - TEMPORAL_SMOOTHING) * d
            plane_params = (smoothed_normal, smoothed_d)
            normal, d = plane_params

        prev_plane_params = plane_params

        # 床面マスクの作成
        floor_mask = np.zeros((original_height, original_width), dtype=bool)
        if len(floor_inliers) > 0:
            y_indices = floor_inliers // original_width
            x_indices = floor_inliers % original_width
            floor_mask[y_indices, x_indices] = True

        # 全ピクセルの3D座標と平面からの距離計算
        yy, xx = np.meshgrid(np.arange(original_height), np.arange(original_width), indexing='ij')
        points_all = np.column_stack([
            xx.flatten() / original_width,
            yy.flatten() / original_height,
            depth_map.flatten()
        ])

        distances = np.abs(np.dot(points_all, normal) + d).reshape(original_height, original_width)

        # 床面より上にある点を危険領域として検出
        # 符号付き距離を計算(床面より上がプラス)
        signed_distances = np.dot(points_all, normal) + d
        signed_distances = signed_distances.reshape(original_height, original_width)

        # 床面から一定距離以上離れている領域を危険領域とする
        danger_mask = (np.abs(signed_distances) > distance_threshold_current) & (depth_map > 0)

        # 床面自体は危険領域から除外
        danger_mask = danger_mask & ~floor_mask

    else:
        # 平面が検出できない場合は何も表示しない
        danger_mask = np.zeros((original_height, original_width), dtype=bool)

    # 危険領域の後処理
    danger_mask = danger_mask.astype(np.uint8) * 255

    # モルフォロジー処理
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    danger_mask = cv2.morphologyEx(danger_mask, cv2.MORPH_OPEN, kernel)
    danger_mask = cv2.morphologyEx(danger_mask, cv2.MORPH_CLOSE, kernel)

    # 輪郭検出
    contours, _ = cv2.findContours(
        danger_mask,
        cv2.RETR_EXTERNAL,
        cv2.CHAIN_APPROX_SIMPLE
    )

    # 危険領域のカウント
    area_count = 0
    total_area = 0
    danger_regions = []

    for contour in contours:
        area = cv2.contourArea(contour)
        if area > DANGER_AREA_SIZE:
            area_count += 1
            total_area += area
            # 重心を計算
            M = cv2.moments(contour)
            if M["m00"] != 0:
                cx = int(M["m10"] / M["m00"])
                cy = int(M["m01"] / M["m00"])
                danger_regions.append((cx, cy, area))

    # 結果の可視化
    if SHOW_DEPTH_MAP:
        # 深度マップモード
        depth_vis = (depth_map * 255).astype(np.uint8)
        result_frame = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
        result_frame = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR)

        # 床面領域を緑で強調
        if plane_params is not None and len(floor_inliers) > 0:
            floor_overlay = result_frame.copy()
            y_indices = floor_inliers // original_width
            x_indices = floor_inliers % original_width
            floor_overlay[y_indices, x_indices] = [0, 255, 0]
            cv2.addWeighted(floor_overlay, 0.3, result_frame, 0.7, 0, result_frame)
    else:
        # 通常モード
        result_frame = frame.copy()
        overlay = frame.copy()

        # 危険領域を赤で塗りつぶし
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > DANGER_AREA_SIZE:
                cv2.drawContours(overlay, [contour], -1, (0, 0, 255), -1)

        # 半透明オーバーレイの適用
        cv2.addWeighted(overlay, 0.3, result_frame, 0.7, 0, result_frame)

        # 危険領域の輪郭を強調
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > DANGER_AREA_SIZE:
                cv2.drawContours(result_frame, [contour], -1, (0, 0, 255), 2)

        # 危険領域に警告マーカーを表示
        for cx, cy, area in danger_regions:
            marker_size = min(int(10 + area / 1000), 30)
            cv2.drawMarker(result_frame, (cx, cy), (0, 255, 255),
                          cv2.MARKER_TRIANGLE_UP, marker_size, 2)

        # 深度マップのサムネイル(右下)
        depth_vis = (depth_map * 255).astype(np.uint8)
        depth_vis_color = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
        small_depth = cv2.resize(depth_vis_color, (200, 150))
        result_frame[original_height-150:original_height, original_width-200:original_width] = small_depth

    # 情報表示
    info_text = f'Danger Areas: {area_count}, Total Area: {total_area:.0f}px'
    cv2.putText(result_frame, info_text, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # 閾値情報
    threshold_text = f'Distance Threshold: {distance_threshold_current:.3f}'
    cv2.putText(result_frame, threshold_text, (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 1)

    # 床面検出状態
    if plane_params is not None:
        floor_text = f'Floor Detected: {len(floor_inliers)} points'
        cv2.putText(result_frame, floor_text, (10, 90),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
    else:
        cv2.putText(result_frame, 'Floor: Not detected', (10, 90),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1)

    # 操作説明
    cv2.putText(result_frame, "Press 'd' for depth, 't' to adjust threshold", (10, original_height - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    # 1秒間隔で結果を表示
    if current_time - last_print_time >= 1.0:
        timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_time))
        floor_status = "Detected" if plane_params is not None else "Not detected"
        result_text = f'[{timestamp}] Frame: {frame_count}, Danger Areas: {area_count}, Total Area: {total_area:.0f}px, Floor: {floor_status}'
        print(result_text)
        results_log.append(result_text)
        last_print_time = current_time

    return result_frame


# メニュー表示
print('0: 動画ファイル')
print('1: カメラ')
print('2: サンプル動画')

choice = input('選択: ')
temp_file = None

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        exit()
    cap = cv2.VideoCapture(path)
elif choice == '1':
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("カメラを開けませんでした")
        exit()
elif choice == '2':
    # サンプル動画ダウンロード・処理
    import urllib.request
    url = 'https://github.com/opencv/opencv/raw/master/samples/data/vtest.avi'
    filename = 'vtest.avi'
    try:
        urllib.request.urlretrieve(url, filename)
        temp_file = filename
        cap = cv2.VideoCapture(filename)
    except Exception as e:
        print(f'動画のダウンロードに失敗しました: {url}')
        print(f'エラー: {e}')
        exit()
else:
    print('無効な選択です')
    exit()

# メイン処理
try:
    print("\n処理を開始します...")
    print("キー操作: 'q'=終了, 'd'=深度マップ表示, 't'=閾値増加, 'r'=閾値減少")

    while True:
        ret, frame = cap.read()
        if not ret:
            print("フレームの読み込みに失敗しました")
            break

        result = video_processing(frame)
        cv2.imshow('Floor Step Detection - Fixed Version', result)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
        elif key == ord('d'):
            SHOW_DEPTH_MAP = not SHOW_DEPTH_MAP
            mode = "Depth Map" if SHOW_DEPTH_MAP else "Normal"
            print(f"表示モード: {mode}")
        elif key == ord('t'):
            distance_threshold_current = min(distance_threshold_current + 0.005, 0.1)
            print(f"距離閾値: {distance_threshold_current:.3f}")
        elif key == ord('r'):
            distance_threshold_current = max(distance_threshold_current - 0.005, 0.01)
            print(f"距離閾値: {distance_threshold_current:.3f}")

finally:
    cap.release()
    cv2.destroyAllWindows()
    if temp_file and os.path.exists(temp_file):
        os.remove(temp_file)

    # 結果をファイルに保存
    if results_log:
        with open('result.txt', 'w', encoding='utf-8') as f:
            f.write('=== 床面段差検出結果 ===\n')
            f.write('使用技術: DPT + RANSAC\n')
            f.write(f'最終距離閾値: {distance_threshold_current:.3f}\n')
            f.write(f'設定値: DANGER_AREA_SIZE={DANGER_AREA_SIZE}px\n')
            f.write('\n')
            for line in results_log:
                f.write(line + '\n')
        print('\nresult.txtに保存しました')