DPT による床面段差検出(ソースコードと実行結果)

Python開発環境,ライブラリ類
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。
Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul
【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent
【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なライブラリのインストール
コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers opencv-python numpy pillow scipy scikit-learn
床面段差検出プログラム
概要
このプログラムは、カメラや動画から視覚情報を取得し、深度推定技術により床面の段差を検出。 視覚情報から3次元的な深度情報を推定し、床面の段差や凹凸を検出する。DPT(Dense Prediction Transformer)を用いて2次元画像から深度マップを生成し、RANSAC平面推定により床面を特定した後、その平面からの距離を計算することで段差を検出する。これにより、人間が目で見て判断するような危険箇所の認識を行う。
主要技術
- DPT (Dense Prediction Transformer)
Vision Transformerアーキテクチャを密な予測タスクに適用した深度推定モデルである。従来のCNNベースの手法と異なり、Transformerの自己注意機構により画像全体の文脈を考慮した高精度な深度推定を実現する。Intel/dpt-largeモデルは、複数の大規模データセットで事前学習されており、単一のRGB画像から相対的な深度マップを生成できる [1]。 - RANSAC (Random Sample Consensus)
外れ値に頑健な平面推定アルゴリズムである。本プログラムでは、深度マップから生成した3次元点群に対してRANSACを適用し、床面を表す平面を推定している。ランダムにサンプリングした点から平面パラメータを算出し、最も多くの点が属する平面を床面として特定する [2]。
参考文献
[1] Ranftl, R., Bochkovskiy, A., & Koltun, V. (2021). Vision transformers for dense prediction. In Proceedings of the IEEE/CVF International Conference on Computer Vision (pp. 12179-12188).
[2] Fischler, M. A., & Bolles, R. C. (1981). Random sample consensus: a paradigm for model fitting with applications to image analysis and automated cartography. Communications of the ACM, 24(6), 381-395.
ソースコード
# プログラム名: 床面段差検出プログラム - DPT深度推定による危険箇所可視化
# 特徴技術名: DPT (Dense Prediction Transformer) + CC-RANSAC
# 出典: Ranftl, R., et al. (2021). Vision Transformers for Dense Prediction. ICCV 2021.
# 特徴機能: Vision Transformerベースアーキテクチャによる高解像度単眼深度推定 + 連結成分考慮のRANSAC平面検出
# 学習済みモデル: DPT-Large、高精度な深度推定が可能なTransformerベースモデル、URL: https://huggingface.co/Intel/dpt-large
# 方式設計:
# - 関連利用技術: OpenCV (動画処理), NumPy (数値計算), Transformers (モデル読み込み), Tkinter (GUI), scikit-learn (RANSAC)
# - 入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択)、出力: 処理結果が画像化されOpenCV画面でリアルタイム表示
# - 処理手順: RGB画像入力→DPTによる深度推定→3D点群生成→CC-RANSAC平面推定→適応的閾値による段差検出→危険領域の可視化
# - 前処理、後処理: 前処理: DPTImageProcessorによる自動前処理、後処理: エッジ保存フィルタ、適応的深度正規化
# - 追加処理: 深度ヒストグラムベースの動的閾値設定、連結成分を考慮したRANSAC、ロバストな時間的平滑化
# 修正内容:
# - DPTの深度値の正しい解釈(逆深度を深度に変換)
# - 床面検出の改善(最大の平面領域を床面として検出)
# - 閾値の自動調整機能
# 前準備:
# - pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
# - pip install transformers opencv-python numpy pillow scipy scikit-learn opencv-contrib-python
import cv2
import numpy as np
import torch
import tkinter as tk
from tkinter import filedialog
from scipy.ndimage import gaussian_filter, median_filter
from scipy.spatial.distance import cdist
from sklearn.cluster import DBSCAN
from PIL import Image
from transformers import DPTImageProcessor, DPTForDepthEstimation
import time
import os
import warnings
warnings.filterwarnings('ignore')
# プログラムの概要表示
print('=== 床面段差検出プログラム(修正版) ===')
print('DPT深度推定とCC-RANSACを使用して床面の段差や危険箇所を高精度に検出します')
print("操作方法: 'q'キーで終了, 'd'キーで深度マップ表示切替, 't'キーで閾値調整")
print('')
# 調整可能な設定値
DISTANCE_THRESHOLD = 0.03 # 平面からの距離閾値(正規化された単位)
DANGER_AREA_SIZE = 500 # 危険領域として表示する最小面積(ピクセル)
RANSAC_ITERATIONS = 100 # RANSAC反復回数
RANSAC_SAMPLE_RATIO = 0.3 # RANSACサンプリング比率
TEMPORAL_SMOOTHING = 0.8 # 時間的平滑化の重み
CONNECTIVITY_RADIUS = 30 # 連結成分の判定半径(ピクセル)
FLOOR_PERCENTILE_LOW = 0.6 # 床面検出の下限パーセンタイル
FLOOR_PERCENTILE_HIGH = 0.95 # 床面検出の上限パーセンタイル
# フィルタリング設定
BILATERAL_D = 5 # バイラテラルフィルタの直径
BILATERAL_SIGMA_COLOR = 50 # 色空間のシグマ
BILATERAL_SIGMA_SPACE = 50 # 座標空間のシグマ
MEDIAN_KERNEL_SIZE = 3 # 中央値フィルタのカーネルサイズ
# デバッグ用フラグ
SHOW_DEPTH_MAP = False # 深度マップの表示切替
# 乱数シード(再現性のため)
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
# DPTモデルを使用
print('深度推定モデルを読み込み中...')
processor = DPTImageProcessor.from_pretrained('Intel/dpt-large')
model = DPTForDepthEstimation.from_pretrained(
'Intel/dpt-large',
torch_dtype=torch_dtype
).to(device)
model.eval()
print('モデルの読み込みが完了しました')
print('')
# 結果記録用
results_log = []
last_print_time = time.time()
frame_count = 0
prev_plane_params = None
depth_history = []
distance_threshold_current = DISTANCE_THRESHOLD
def simple_plane_ransac(points_3d, max_iterations=100, threshold=0.02):
"""シンプルなRANSAC平面検出(デバッグ用)"""
best_inliers = []
best_plane = None
n_points = len(points_3d)
if n_points < 100:
return None, []
for _ in range(max_iterations):
# ランダムに3点選択
sample_idx = np.random.choice(n_points, 3, replace=False)
p1, p2, p3 = points_3d[sample_idx]
# 平面の法線ベクトル計算
v1 = p2 - p1
v2 = p3 - p1
normal = np.cross(v1, v2)
if np.linalg.norm(normal) < 1e-6:
continue
normal = normal / np.linalg.norm(normal)
d = -np.dot(normal, p1)
# 距離計算
distances = np.abs(np.dot(points_3d, normal) + d)
inliers = np.where(distances < threshold)[0]
# 最も多くのインライアを持つ平面を選択
if len(inliers) > len(best_inliers):
best_inliers = inliers
best_plane = (normal, d)
return best_plane, best_inliers
def detect_floor_plane(depth_map, original_width, original_height):
"""床面平面を検出(改善版)"""
# 深度値の統計情報
valid_mask = depth_map > 0
if not np.any(valid_mask):
return None, np.array([])
# 画像の下半分を重点的にサンプリング(床面は通常下側にある)
lower_half_start = original_height // 2
lower_mask = np.zeros_like(depth_map, dtype=bool)
lower_mask[lower_half_start:, :] = True
# 有効かつ下半分の領域
floor_candidate_mask = valid_mask & lower_mask
# 深度値のパーセンタイルで床面候補を絞り込む
valid_depths = depth_map[floor_candidate_mask]
if len(valid_depths) < 100:
return None, np.array([])
# 床面は通常、ある程度の深度範囲に集中している
depth_low = np.percentile(valid_depths, FLOOR_PERCENTILE_LOW * 100)
depth_high = np.percentile(valid_depths, FLOOR_PERCENTILE_HIGH * 100)
# 深度範囲内のピクセルを床面候補とする
floor_candidate_mask = floor_candidate_mask & (depth_map >= depth_low) & (depth_map <= depth_high)
# 候補点の3D座標を生成
y_coords, x_coords = np.where(floor_candidate_mask)
z_values = depth_map[floor_candidate_mask]
# 正規化された3D座標(簡易版)
points_3d = np.column_stack([
x_coords / original_width,
y_coords / original_height,
z_values
])
# サンプリング
n_points = len(points_3d)
if n_points > 10000:
sample_idx = np.random.choice(n_points, 10000, replace=False)
points_3d_sample = points_3d[sample_idx]
else:
points_3d_sample = points_3d
# RANSAC平面検出
plane_params, inlier_indices = simple_plane_ransac(
points_3d_sample,
max_iterations=RANSAC_ITERATIONS,
threshold=0.01 # より厳しい閾値
)
if plane_params is None:
return None, np.array([])
# 全床面候補点に対してインライアを再計算
normal, d = plane_params
distances = np.abs(np.dot(points_3d, normal) + d)
floor_inliers = np.where(distances < 0.02)[0]
# インライアの画像座標
floor_pixel_indices = np.zeros(len(floor_inliers), dtype=int)
for i, idx in enumerate(floor_inliers):
y = y_coords[idx]
x = x_coords[idx]
floor_pixel_indices[i] = y * original_width + x
return plane_params, floor_pixel_indices
def video_processing(frame):
global frame_count, last_print_time, results_log, prev_plane_params, SHOW_DEPTH_MAP, distance_threshold_current
frame_count += 1
current_time = time.time()
# 元のフレームサイズを保存
original_height, original_width = frame.shape[:2]
# BGR to RGB変換
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# PIL Imageに変換
pil_image = Image.fromarray(rgb_frame)
# 深度推定の実行
inputs = processor(images=pil_image, return_tensors='pt')
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
depth = outputs.predicted_depth
# 深度マップをCPUに移動し、NumPy配列に変換
depth_map = depth.squeeze().cpu().numpy()
# 深度マップを元のサイズにリサイズ
depth_map = cv2.resize(
depth_map,
(original_width, original_height),
interpolation=cv2.INTER_LINEAR
)
# DPTは逆深度を出力することがあるので、値を反転
# (遠いものほど小さい値になるように)
depth_map = -depth_map
# 正規化(0-1の範囲)
depth_min = np.min(depth_map)
depth_max = np.max(depth_map)
if depth_max - depth_min > 1e-6:
depth_map = (depth_map - depth_min) / (depth_max - depth_min)
else:
depth_map = np.ones_like(depth_map) * 0.5
# ノイズ除去
depth_map = median_filter(depth_map, size=MEDIAN_KERNEL_SIZE)
depth_map = cv2.bilateralFilter(
depth_map.astype(np.float32),
BILATERAL_D,
BILATERAL_SIGMA_COLOR,
BILATERAL_SIGMA_SPACE
)
# 床面平面の検出
plane_params, floor_inliers = detect_floor_plane(
depth_map, original_width, original_height
)
# 危険領域の検出
danger_mask = np.zeros((original_height, original_width), dtype=np.uint8)
if plane_params is not None:
normal, d = plane_params
# 時間的平滑化
if prev_plane_params is not None:
prev_normal, prev_d = prev_plane_params
# 法線ベクトルの向きを揃える
if np.dot(normal, prev_normal) < 0:
normal = -normal
d = -d
smoothed_normal = TEMPORAL_SMOOTHING * prev_normal + (1 - TEMPORAL_SMOOTHING) * normal
smoothed_normal = smoothed_normal / np.linalg.norm(smoothed_normal)
smoothed_d = TEMPORAL_SMOOTHING * prev_d + (1 - TEMPORAL_SMOOTHING) * d
plane_params = (smoothed_normal, smoothed_d)
normal, d = plane_params
prev_plane_params = plane_params
# 床面マスクの作成
floor_mask = np.zeros((original_height, original_width), dtype=bool)
if len(floor_inliers) > 0:
y_indices = floor_inliers // original_width
x_indices = floor_inliers % original_width
floor_mask[y_indices, x_indices] = True
# 全ピクセルの3D座標と平面からの距離計算
yy, xx = np.meshgrid(np.arange(original_height), np.arange(original_width), indexing='ij')
points_all = np.column_stack([
xx.flatten() / original_width,
yy.flatten() / original_height,
depth_map.flatten()
])
distances = np.abs(np.dot(points_all, normal) + d).reshape(original_height, original_width)
# 床面より上にある点を危険領域として検出
# 符号付き距離を計算(床面より上がプラス)
signed_distances = np.dot(points_all, normal) + d
signed_distances = signed_distances.reshape(original_height, original_width)
# 床面から一定距離以上離れている領域を危険領域とする
danger_mask = (np.abs(signed_distances) > distance_threshold_current) & (depth_map > 0)
# 床面自体は危険領域から除外
danger_mask = danger_mask & ~floor_mask
else:
# 平面が検出できない場合は何も表示しない
danger_mask = np.zeros((original_height, original_width), dtype=bool)
# 危険領域の後処理
danger_mask = danger_mask.astype(np.uint8) * 255
# モルフォロジー処理
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
danger_mask = cv2.morphologyEx(danger_mask, cv2.MORPH_OPEN, kernel)
danger_mask = cv2.morphologyEx(danger_mask, cv2.MORPH_CLOSE, kernel)
# 輪郭検出
contours, _ = cv2.findContours(
danger_mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE
)
# 危険領域のカウント
area_count = 0
total_area = 0
danger_regions = []
for contour in contours:
area = cv2.contourArea(contour)
if area > DANGER_AREA_SIZE:
area_count += 1
total_area += area
# 重心を計算
M = cv2.moments(contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
danger_regions.append((cx, cy, area))
# 結果の可視化
if SHOW_DEPTH_MAP:
# 深度マップモード
depth_vis = (depth_map * 255).astype(np.uint8)
result_frame = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
result_frame = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR)
# 床面領域を緑で強調
if plane_params is not None and len(floor_inliers) > 0:
floor_overlay = result_frame.copy()
y_indices = floor_inliers // original_width
x_indices = floor_inliers % original_width
floor_overlay[y_indices, x_indices] = [0, 255, 0]
cv2.addWeighted(floor_overlay, 0.3, result_frame, 0.7, 0, result_frame)
else:
# 通常モード
result_frame = frame.copy()
overlay = frame.copy()
# 危険領域を赤で塗りつぶし
for contour in contours:
area = cv2.contourArea(contour)
if area > DANGER_AREA_SIZE:
cv2.drawContours(overlay, [contour], -1, (0, 0, 255), -1)
# 半透明オーバーレイの適用
cv2.addWeighted(overlay, 0.3, result_frame, 0.7, 0, result_frame)
# 危険領域の輪郭を強調
for contour in contours:
area = cv2.contourArea(contour)
if area > DANGER_AREA_SIZE:
cv2.drawContours(result_frame, [contour], -1, (0, 0, 255), 2)
# 危険領域に警告マーカーを表示
for cx, cy, area in danger_regions:
marker_size = min(int(10 + area / 1000), 30)
cv2.drawMarker(result_frame, (cx, cy), (0, 255, 255),
cv2.MARKER_TRIANGLE_UP, marker_size, 2)
# 深度マップのサムネイル(右下)
depth_vis = (depth_map * 255).astype(np.uint8)
depth_vis_color = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
small_depth = cv2.resize(depth_vis_color, (200, 150))
result_frame[original_height-150:original_height, original_width-200:original_width] = small_depth
# 情報表示
info_text = f'Danger Areas: {area_count}, Total Area: {total_area:.0f}px'
cv2.putText(result_frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 閾値情報
threshold_text = f'Distance Threshold: {distance_threshold_current:.3f}'
cv2.putText(result_frame, threshold_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 1)
# 床面検出状態
if plane_params is not None:
floor_text = f'Floor Detected: {len(floor_inliers)} points'
cv2.putText(result_frame, floor_text, (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
else:
cv2.putText(result_frame, 'Floor: Not detected', (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1)
# 操作説明
cv2.putText(result_frame, "Press 'd' for depth, 't' to adjust threshold", (10, original_height - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 1秒間隔で結果を表示
if current_time - last_print_time >= 1.0:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_time))
floor_status = "Detected" if plane_params is not None else "Not detected"
result_text = f'[{timestamp}] Frame: {frame_count}, Danger Areas: {area_count}, Total Area: {total_area:.0f}px, Floor: {floor_status}'
print(result_text)
results_log.append(result_text)
last_print_time = current_time
return result_frame
# メニュー表示
print('0: 動画ファイル')
print('1: カメラ')
print('2: サンプル動画')
choice = input('選択: ')
temp_file = None
if choice == '0':
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename()
if not path:
exit()
cap = cv2.VideoCapture(path)
elif choice == '1':
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("カメラを開けませんでした")
exit()
elif choice == '2':
# サンプル動画ダウンロード・処理
import urllib.request
url = 'https://github.com/opencv/opencv/raw/master/samples/data/vtest.avi'
filename = 'vtest.avi'
try:
urllib.request.urlretrieve(url, filename)
temp_file = filename
cap = cv2.VideoCapture(filename)
except Exception as e:
print(f'動画のダウンロードに失敗しました: {url}')
print(f'エラー: {e}')
exit()
else:
print('無効な選択です')
exit()
# メイン処理
try:
print("\n処理を開始します...")
print("キー操作: 'q'=終了, 'd'=深度マップ表示, 't'=閾値増加, 'r'=閾値減少")
while True:
ret, frame = cap.read()
if not ret:
print("フレームの読み込みに失敗しました")
break
result = video_processing(frame)
cv2.imshow('Floor Step Detection - Fixed Version', result)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('d'):
SHOW_DEPTH_MAP = not SHOW_DEPTH_MAP
mode = "Depth Map" if SHOW_DEPTH_MAP else "Normal"
print(f"表示モード: {mode}")
elif key == ord('t'):
distance_threshold_current = min(distance_threshold_current + 0.005, 0.1)
print(f"距離閾値: {distance_threshold_current:.3f}")
elif key == ord('r'):
distance_threshold_current = max(distance_threshold_current - 0.005, 0.01)
print(f"距離閾値: {distance_threshold_current:.3f}")
finally:
cap.release()
cv2.destroyAllWindows()
if temp_file and os.path.exists(temp_file):
os.remove(temp_file)
# 結果をファイルに保存
if results_log:
with open('result.txt', 'w', encoding='utf-8') as f:
f.write('=== 床面段差検出結果 ===\n')
f.write('使用技術: DPT + RANSAC\n')
f.write(f'最終距離閾値: {distance_threshold_current:.3f}\n')
f.write(f'設定値: DANGER_AREA_SIZE={DANGER_AREA_SIZE}px\n')
f.write('\n')
for line in results_log:
f.write(line + '\n')
print('\nresult.txtに保存しました')