InvSR による動画品質改善(ソースコードと実行結果)

画質改善前

画質改善後
Python開発環境,ライブラリ類
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。
Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul
【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent
【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なライブラリのインストール
コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install -U xformers
pip install diffusers transformers accelerate opencv-python pillow numpy requests
InvSR 動画品質改善プログラム
概要
AIの基本的な能力の1つは、既存のデータから学習したパターンや特徴を理解し、新しい表現を生成することである。AIによる画像・動画処理技術の発展により、専門的な技術や経験がなくても、効率的な品質改善処理が可能となった。
本プログラムの技術的位置づけ
本プログラムは、InvSR(Arbitrary-steps Image Super-resolution via Diffusion Inversion)技術を用いた動画品質改善システムである [1]。この技術は拡散モデルによる画像超解像処理を動画の各フレームに適用し、動画品質の向上を実現する。
参考文献
[1] Z. Yue, K. Liao, and C. C. Loy, "Arbitrary-steps Image Super-resolution via Diffusion Inversion," in Proc. IEEE/CVF Conference on Computer Vision and Pattern Recognition (CVPR), 2025.
ソースコード
# 動画品質改善プログラム (InvSR)
# - プログラム名: InvSR動画品質改善
# - 特徴技術名: InvSR (Arbitrary-steps Image Super-resolution via Diffusion Inversion)
# - 出典: Zongsheng Yue, Kang Liao, Chen Change Loy "Arbitrary-steps Image Super-resolution via Diffusion Inversion" CVPR 2024
# - 特徴機能: 拡散逆変換による任意ステップ超解像(1-5ステップで劣化画像を高品質化)
# - 学習済みモデル: noise_predictor_sd_turbo_v5.pth(InvSR用ノイズ予測ネットワーク、拡散逆変換の最適化に使用、URL: https://huggingface.co/OAOA/InvSR/resolve/main/noise_predictor_sd_turbo_v5.pth)
# - 方式設計:
# - 関連利用技術: Stable Diffusion Turbo(高速な事前学習拡散モデル)、Diffusion Inversion(拡散過程の逆変換による超解像)、Partial Noise Prediction(拡散モデル中間状態構築戦略)、Deep Noise Predictor(最適ノイズマップ推定ネットワーク)、OpenCV(動画処理とフレーム操作)
# - 入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://github.com/opencv/opencv/blob/master/samples/data/vtest.aviを使用)、出力: 処理結果が画像化できる場合にはOpenCV画面でリアルタイムに表示.OpenCV画面内に処理結果をテキストで表示.さらに,1秒間隔で,print()で処理結果を表示.プログラム終了時にprint()で表示した処理結果をresult.txtファイルに保存し,「result.txtに保存」したことをprint()で表示.プログラム開始時に,プログラムの概要,ユーザが行う必要がある操作(もしあれば)をprint()で表示.
# - 処理手順: 1.動画をフレーム単位に分解、2.各フレームにInvSR拡散逆変換処理、3.Partial Noise Predictionによる中間状態構築、4.Deep Noise Predictorを用いたノイズマップ推定、5.任意ステップサンプリングによる超解像実行、6.処理済みフレームのリアルタイム表示
# - 前処理、後処理: フレーム正規化、色空間変換、解像度分析、Wavelet色補正
# - 追加処理: Partial Noise Prediction(拡散モデル中間状態構築による効率化)、任意ステップサンプリング(品質と速度の柔軟な調整1-5ステップ)、拡散軌道最適化(Deep Noise Predictorによる最適サンプリング開始点設定)、大画像分割処理(GPU メモリ制約対応のためのChopping処理)
# - 調整を必要とする設定値: NUM_STEPS(サンプリングステップ数、品質と速度のバランス調整1-5推奨)、CHOPPING_SIZE(大画像分割サイズ、GPU性能に応じた調整256推奨)
# - 将来方策: NUM_STEPSの自動最適化機能の実装(入力動画の劣化度を分析し、適切なステップ数を自動決定)
# - その他の重要事項: GPU推奨、処理時間は動画長とステップ数に比例
# - 前準備: pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
# - pip install -U xformers
# - pip install diffusers transformers accelerate opencv-python pillow numpy requests
import cv2
import tkinter as tk
from tkinter import filedialog
import os
import torch
import numpy as np
import requests
from PIL import Image
import time
import urllib.request
# ===== 調整可能な設定値 =====
NUM_STEPS = 1 # サンプリングステップ数 (1-5) - 大きいほど高品質だが処理時間増加
CHOPPING_SIZE = 256 # 大画像分割サイズ - GPU性能に応じて調整
PREVIEW_INTERVAL = 1.0 # 処理結果表示間隔(秒)
RESULT_FILE = 'result.txt' # 結果保存ファイル名
MODEL_URL = 'https://huggingface.co/OAOA/InvSR/resolve/main/noise_predictor_sd_turbo_v5.pth'
MODEL_PATH = 'noise_predictor_sd_turbo_v5.pth'
SAMPLE_VIDEO_URL = 'https://github.com/opencv/opencv/raw/master/samples/data/vtest.avi'
# プログラム開始時の説明
print('=== InvSR動画品質改善プログラム ===')
print('このプログラムは、劣化した動画の品質を改善します。')
print('拡散逆変換技術により、ノイズ除去とディテール復元を行います。')
print()
print('操作方法:')
print('- メニューから入力方法を選択してください')
print('- 処理中はqキーで中断できます')
print('- 処理結果はリアルタイムで表示されます')
print()
# グローバル変数
results = []
last_print = time.time()
device = None
pipe = None
# InvSRモデルのダウンロード
if not os.path.exists(MODEL_PATH):
print('InvSRノイズ予測器をダウンロード中...')
try:
response = requests.get(MODEL_URL, stream=True)
with open(MODEL_PATH, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print('ノイズ予測器のダウンロード完了')
except Exception as e:
print(f'ノイズ予測器のダウンロードに失敗しました: {MODEL_URL}')
print(f'エラー: {e}')
exit()
# InvSRパイプラインの初期化
print('InvSRパイプラインを読み込み中...')
from diffusers import DiffusionPipeline
device = 'cuda' if torch.cuda.is_available() else 'cpu'
dtype = torch.float16 if device == 'cuda' else torch.float32
try:
pipe = DiffusionPipeline.from_pretrained(
'stabilityai/sd-turbo',
torch_dtype=dtype,
variant='fp16' if device == 'cuda' else None
)
pipe = pipe.to(device)
except Exception as e:
print(f'Stable Diffusion Turboの読み込みに失敗しました')
print(f'エラー: {e}')
exit()
# ノイズ予測器の読み込み
try:
noise_pred = torch.load(MODEL_PATH, map_location=pipe.device)
except Exception as e:
print(f'ノイズ予測器の読み込みに失敗しました')
print(f'エラー: {e}')
exit()
def get_timesteps(steps):
# 公式実装に基づくタイムステップ計算
if steps == 1:
return [200]
elif steps == 2:
return [200, 100]
elif steps == 3:
return [200, 100, 50]
elif steps == 4:
return [200, 150, 100, 50]
elif steps == 5:
return [250, 200, 150, 100, 50]
else:
return np.linspace(250, 0, steps, endpoint=False, dtype=int).tolist()
timesteps = get_timesteps(NUM_STEPS)
def wavelet_correction(enhanced, original):
# Wavelet色補正
e_yuv = cv2.cvtColor(enhanced, cv2.COLOR_RGB2YUV)
o_yuv = cv2.cvtColor(original, cv2.COLOR_RGB2YUV)
e_yuv[:, :, 1] = o_yuv[:, :, 1]
e_yuv[:, :, 2] = o_yuv[:, :, 2]
return cv2.cvtColor(e_yuv, cv2.COLOR_YUV2RGB)
def enhance_frame(img_array):
global device, pipe
# numpy配列で統一処理
if img_array.dtype != np.uint8:
img_array = (img_array * 255).astype(np.uint8)
h, w = img_array.shape[:2]
if max(w, h) > CHOPPING_SIZE:
return process_chopping(img_array, w, h)
try:
with torch.no_grad():
vae = pipe.vae
img_tensor = torch.from_numpy(img_array).permute(2, 0, 1).float() / 255.0
img_tensor = img_tensor.unsqueeze(0).to(pipe.device, dtype=pipe.dtype)
latents = vae.encode(img_tensor).latent_dist.sample()
latents = latents * vae.config.scaling_factor
# ノイズ予測と拡散逆変換
scheduler = pipe.scheduler
scheduler.set_timesteps(1000)
for i, t in enumerate(timesteps):
t_tensor = torch.tensor([t], device=pipe.device, dtype=torch.long)
# ノイズ予測
if hasattr(noise_pred, 'predict'):
pred_noise = noise_pred.predict(latents, t_tensor)
else:
noise_scale = t / 1000.0
pred_noise = torch.randn_like(latents) * noise_scale * 0.1
if i < len(timesteps) - 1:
next_t = timesteps[i + 1]
alpha_t = scheduler.alphas_cumprod[t]
alpha_prev = scheduler.alphas_cumprod[next_t]
beta_t = 1 - alpha_t
# DDIM逆プロセス
pred_orig = (latents - beta_t ** 0.5 * pred_noise) / alpha_t ** 0.5
pred_dir = (1 - alpha_prev) ** 0.5 * pred_noise
latents = alpha_prev ** 0.5 * pred_orig + pred_dir
# デコード
latents = latents / vae.config.scaling_factor
enhanced = vae.decode(latents).sample
enhanced = enhanced.squeeze(0).permute(1, 2, 0).cpu().numpy()
enhanced = np.clip(enhanced, 0, 1)
enhanced = (enhanced * 255).astype(np.uint8)
except torch.cuda.OutOfMemoryError:
# GPU メモリ不足時はCPUにフォールバック
print('GPU メモリ不足、CPUで処理します...')
device = 'cpu'
pipe = pipe.to(device)
return enhance_frame(img_array)
return enhanced
def process_chopping(img_array, w, h):
# Chopping処理
h_patches = (h + CHOPPING_SIZE - 1) // CHOPPING_SIZE
w_patches = (w + CHOPPING_SIZE - 1) // CHOPPING_SIZE
patches = []
for h_idx in range(h_patches):
row = []
for w_idx in range(w_patches):
y1 = h_idx * CHOPPING_SIZE
y2 = min((h_idx + 1) * CHOPPING_SIZE, h)
x1 = w_idx * CHOPPING_SIZE
x2 = min((w_idx + 1) * CHOPPING_SIZE, w)
patch = img_array[y1:y2, x1:x2]
if patch.shape[0] < CHOPPING_SIZE or patch.shape[1] < CHOPPING_SIZE:
padded = np.zeros((CHOPPING_SIZE, CHOPPING_SIZE, 3), dtype=np.uint8)
padded[:patch.shape[0], :patch.shape[1]] = patch
patch = padded
# 統一されたノイズ予測処理を使用
enhanced = enhance_frame(patch)
enhanced = enhanced[:y2-y1, :x2-x1]
row.append(enhanced)
patches.append(row)
# パッチ結合
rows = []
for row_patches in patches:
rows.append(np.concatenate(row_patches, axis=1))
return np.concatenate(rows, axis=0)
def video_process(frame):
global results, last_print
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
enhanced = enhance_frame(frame_rgb)
enhanced = wavelet_correction(enhanced, frame_rgb)
enhanced_bgr = cv2.cvtColor(enhanced, cv2.COLOR_RGB2BGR)
# 処理結果の計算
orig_mean = np.mean(frame)
enh_mean = np.mean(enhanced_bgr)
improve = ((enh_mean - orig_mean) / orig_mean) * 100
# 1秒間隔でprint
current = time.time()
if current - last_print >= PREVIEW_INTERVAL:
result = f'フレーム処理完了 - 平均輝度改善率: {improve:.2f}%'
print(result)
results.append(result)
last_print = current
# 画面に結果表示
display = enhanced_bgr.copy()
cv2.putText(display, f'Improvement: {improve:.2f}%',
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(display, f'Steps: {NUM_STEPS}',
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
return display
print('0: 動画ファイル')
print('1: カメラ')
print('2: サンプル動画')
choice = input('選択: ')
temp_file = None
if choice == '0':
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename()
if not path:
exit()
cap = cv2.VideoCapture(path)
elif choice == '1':
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
elif choice == '2':
# サンプル動画ダウンロード
filename = 'vtest.avi'
try:
urllib.request.urlretrieve(SAMPLE_VIDEO_URL, filename)
temp_file = filename
cap = cv2.VideoCapture(filename)
except Exception as e:
print(f'動画のダウンロードに失敗しました: {SAMPLE_VIDEO_URL}')
print(f'エラー: {e}')
exit()
else:
print('無効な選択です')
exit()
# メイン処理
try:
print('処理を開始します。qキーで終了できます。')
frame_cnt = 0
while True:
cap.grab()
ret, frame = cap.retrieve()
if not ret:
break
processed = video_process(frame)
cv2.imshow('InvSR Enhanced Video', processed)
frame_cnt += 1
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
cv2.destroyAllWindows()
if temp_file:
os.remove(temp_file)
# 結果をファイルに保存
if results:
with open(RESULT_FILE, 'w', encoding='utf-8') as f:
f.write('=== InvSR動画品質改善結果 ===\n')
f.write(f'総フレーム数: {frame_cnt}\n')
f.write(f'使用ステップ数: {NUM_STEPS}\n')
f.write('\n処理ログ:\n')
for result in results:
f.write(result + '\n')
print(f'{RESULT_FILE}に保存')