Zero-DCE++ による夜間・暗所動画画質改善(ソースコードと実行結果)


画質改善前

画質改善後

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

Gitのインストール

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。


REM Git をシステム領域にインストール
winget install --scope machine --id Git.Git -e --silent
REM Git のパス設定
set "GIT_PATH=C:\Program Files\Git\cmd"
if exist "%GIT_PATH%" (
    echo "%PATH%" | find /i "%GIT_PATH%" >nul
    if errorlevel 1 setx PATH "%PATH%;%GIT_PATH%" /M >nul
)

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install opencv-python numpy gitpython
git clone https://github.com/Li-Chongyi/Zero-DCE_extension.git

夜間・暗所動画画質改善プログラム

AIの基本的な能力と動画編集応用

AIの基本的な能力の1つは、既存のデータから学習したパターンや特徴を理解し、新しい表現を生成することである。AIによる画像・動画処理技術の発展により、専門的な技術や経験がなくても、暗所や夜間撮影された動画の効率的な画質改善処理が可能となった。

本プログラムの技術的位置づけ

本プログラムは、Zero-DCE++(Zero-Reference Deep Curve Estimation)技術を用いた低照度動画画質改善システムである。Zero-DCE++は、参照画像を必要としないゼロ参照学習により、深層曲線推定を用いて画像の明度を調整する技術である。

プログラムは以下の技術を使用している。

参考文献

[1] C. Li, C. Guo, C. C. Loy, "Learning to Enhance Low-Light Image via Zero-Reference Deep Curve Estimation," IEEE Transactions on Pattern Analysis and Machine Intelligence, vol. 44, no. 8, pp. 4225-4238, 2022.

ソースコード


# 夜間・暗所動画画質改善プログラム
# - 特徴技術名: Zero-DCE++ (Zero-Reference Deep Curve Estimation)
# - 出典: https://github.com/Li-Chongyi/Zero-DCE_extension
# - 特徴機能: ゼロ参照深層曲線推定による低照度動画の自然な画質改善。明暗混在シーンでも局所的な明度調整により自然な結果を実現
# - AI学習済みモデル: Zero-DCE++事前学習モデル、低照度画像改善に特化、曲線推定により自然な明度調整を実現、https://github.com/Li-Chongyi/Zero-DCE_extension
# - 入力と出力: 入力: 動画ファイル、出力: 画質改善された動画ファイル
# - 前準備: pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
#   pip install opencv-python numpy gitpython
#   git clone https://github.com/Li-Chongyi/Zero-DCE_extension.git
# - 方式設計:
#   - 関連利用技術: OpenCV(動画処理、フレーム抽出・結合)、PyTorch(深層学習フレームワーク、モデル実行)、CLAHE(Contrast Limited Adaptive Histogram Equalization、局所的コントラスト改善)
#   - 処理手順: 1.動画の明るさ分析による自動パラメータ調整、2.動画からフレーム抽出、3.解像度調整、4.バッチ処理でZero-DCE++モデル適用、5.ガンマ補正と色彩保持処理、6.CLAHEによる局所コントラスト調整、7.元解像度に復元、8.処理済みフレームを動画に再構成
#   - 前処理、後処理: 前処理:RGB正規化(0-1範囲)、動的解像度調整、後処理:ガンマ補正による微調整、色彩保持処理による自然さ向上、CLAHEによる局所コントラスト改善
#   - 追加処理: LAB色空間でのCLAHE処理により明暗混在部分の自然な改善、元画像との重み付け合成により色彩を保持、動画全体の明るさ分析による自動パラメータ最適化
#   - 調整を必要とする設定値: BATCH_SIZE(バッチサイズ、現在4、範囲1-8、メモリと速度のバランス)、MAX_RESOLUTION(最大解像度、現在1920、範囲720-3840、処理可能な最大解像度)。将来的に使用可能メモリ量の自動検出によりBATCH_SIZEを動的に最適化可能

import os
import subprocess
import sys

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

# 調整可能な設定値
BATCH_SIZE = 4  # バッチサイズ(1-8):メモリと速度のバランス
MAX_RESOLUTION = 1920  # 最大解像度(720-3840):処理可能な最大解像度

# モデル関連設定
MODEL_PATH = "Zero-DCE_extension/Zero-DCE++/snapshots_Zero_DCE++/Epoch99.pth"
REPO_URL = "https://github.com/Li-Chongyi/Zero-DCE_extension.git"


# 深さ方向分離畳み込み層(修正版)
class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels, mid_channels=None, kernel_size=3, stride=1, padding=1):
        super(DepthwiseSeparableConv, self).__init__()
        if mid_channels is None:
            mid_channels = in_channels
        self.depth_conv = nn.Conv2d(in_channels, mid_channels, kernel_size, stride, padding, groups=in_channels, bias=True)
        self.point_conv = nn.Conv2d(mid_channels, out_channels, 1, 1, 0, bias=True)

    def forward(self, x):
        x = self.depth_conv(x)
        x = self.point_conv(x)
        return x


# Zero-DCE++モデル定義(公式実装準拠)
class DCENet(nn.Module):
    def __init__(self, n_curves=1):
        super(DCENet, self).__init__()
        self.n_curves = n_curves

        # エンコーダー部分(深さ方向分離畳み込み)
        self.e_conv1 = DepthwiseSeparableConv(3, 32, 3, 3, 1, 1)
        self.e_conv2 = DepthwiseSeparableConv(32, 32, 32, 3, 1, 1)
        self.e_conv3 = DepthwiseSeparableConv(32, 32, 32, 3, 1, 1)
        self.e_conv4 = DepthwiseSeparableConv(32, 32, 32, 3, 1, 1)
        self.e_conv5 = DepthwiseSeparableConv(32, 32, 64, 3, 1, 1)
        self.e_conv6 = DepthwiseSeparableConv(32, 32, 64, 3, 1, 1)
        self.e_conv7 = DepthwiseSeparableConv(32, 3, 64, 3, 1, 1)

    def forward(self, x):
        # エンコーダー
        e1 = F.relu(self.e_conv1(x))
        e2 = F.relu(self.e_conv2(e1))
        e3 = F.relu(self.e_conv3(e2))
        e4 = F.relu(self.e_conv4(e3))
        e5 = F.relu(self.e_conv5(e4))
        e6 = F.relu(self.e_conv6(e5))
        A = torch.tanh(self.e_conv7(e6))

        # 曲線推定による画像改善
        enhanced = x
        for i in range(self.n_curves):
            enhanced = enhanced + A * (torch.pow(enhanced, 2) - enhanced)

        return enhanced, A


# バッチ処理関数
def process_batch(resized_buffer, original_sizes, model, device, gamma_value, color_preserve_ratio):
    # バッチ処理
    normalized_frames = []
    for resized_frame in resized_buffer:
        frame_rgb = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
        frame_normalized = frame_rgb.astype(np.float32) / 255.0
        normalized_frames.append(frame_normalized)

    batch_tensor = torch.stack([
        torch.from_numpy(nf).permute(2, 0, 1) for nf in normalized_frames
    ]).to(device)

    with torch.no_grad():
        enhanced_batch, _ = model(batch_tensor)
        enhanced_batch = torch.clamp(enhanced_batch, 0, 1)

    processed_frames = []
    for i in range(len(resized_buffer)):
        # 後処理
        enhanced_np = enhanced_batch[i].permute(1, 2, 0).cpu().numpy()
        enhanced_np = np.power(enhanced_np, 1.0 / gamma_value)
        enhanced_np = color_preserve_ratio * enhanced_np + (1 - color_preserve_ratio) * normalized_frames[i]

        # CLAHE処理
        lab = cv2.cvtColor((enhanced_np * 255).astype(np.uint8), cv2.COLOR_RGB2LAB)
        l, a, b = cv2.split(lab)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        l = clahe.apply(l)
        enhanced_lab = cv2.merge([l, a, b])
        enhanced_rgb = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB)
        enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR)

        # 元解像度に復元
        if original_sizes[i]:
            enhanced_bgr = cv2.resize(enhanced_bgr, original_sizes[i])

        processed_frames.append(enhanced_bgr)

    # メモリ解放
    del batch_tensor
    del enhanced_batch
    torch.cuda.empty_cache()

    return processed_frames


# メイン処理
input_path = "input_video.mp4"
output_path = "enhanced_video.mp4"

# 動画を開く
cap = cv2.VideoCapture(input_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# 動画の明るさ分析による自動パラメータ調整
print("動画を分析中...")
brightness_values = []
sample_frames = 30
interval = max(1, total_frames // sample_frames)

for i in range(0, total_frames, interval):
    cap.set(cv2.CAP_PROP_POS_FRAMES, i)
    ret, frame = cap.read()
    if ret:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        brightness_values.append(np.mean(gray))

cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
avg_brightness = np.mean(brightness_values)

# 明るさに基づいて自動調整
if avg_brightness < 50:
    gamma_value, color_preserve_ratio = 1.5, 0.8
elif avg_brightness < 100:
    gamma_value, color_preserve_ratio = 1.3, 0.7
else:
    gamma_value, color_preserve_ratio = 1.2, 0.7

print(f"自動設定: ガンマ={gamma_value}, 色彩保持={color_preserve_ratio}")

# デバイス設定(自動GPU/CPUフォールバック)
device = torch.device('cpu')
if torch.cuda.is_available():
    try:
        # 実際の処理解像度でテスト
        test_height = min(height, MAX_RESOLUTION)
        test_width = min(width, MAX_RESOLUTION)
        if max(height, width) > MAX_RESOLUTION:
            scale = MAX_RESOLUTION / max(height, width)
            test_width = int(width * scale)
            test_height = int(height * scale)

        test_tensor = torch.zeros(BATCH_SIZE, 3, test_height, test_width).cuda()
        del test_tensor
        torch.cuda.empty_cache()
        device = torch.device('cuda')
    except:
        print("GPU メモリ不足のため CPU を使用します")

print(f"使用デバイス: {device}")

# モデルの初期化(n_curves=1で固定)
model = DCENet(n_curves=1).to(device)

if not os.path.exists(MODEL_PATH):
    print("学習済みモデルリポジトリをクローン中...")
    import git
    git.Repo.clone_from(REPO_URL, "Zero-DCE_extension")
    print("クローン完了")

# モデル重みの読み込み(辞書形式に対応)
checkpoint = torch.load(MODEL_PATH, map_location=device)
# 辞書形式とstate_dict直接形式の両方に対応
if isinstance(checkpoint, dict) and 'state_dict' in checkpoint:
    model.load_state_dict(checkpoint['state_dict'], strict=True)
else:
    model.load_state_dict(checkpoint, strict=True)
print("学習済みモデルを読み込みました")

model.eval()

# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# バッチ処理
resized_buffer = []
original_sizes = []
frame_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        # 残りのフレームを処理
        if resized_buffer:
            processed_frames = process_batch(resized_buffer, original_sizes, model, device, gamma_value, color_preserve_ratio)
            for enhanced_bgr in processed_frames:
                out.write(enhanced_bgr)
        break

    # 解像度調整
    h, w = frame.shape[:2]
    if max(h, w) > MAX_RESOLUTION:
        scale = MAX_RESOLUTION / max(h, w)
        new_w, new_h = int(w * scale), int(h * scale)
        resized = cv2.resize(frame, (new_w, new_h))
        original_size = (w, h)
    else:
        resized = frame
        original_size = None

    resized_buffer.append(resized)
    original_sizes.append(original_size)

    # バッチサイズに達したら処理
    if len(resized_buffer) == BATCH_SIZE:
        processed_frames = process_batch(resized_buffer, original_sizes, model, device, gamma_value, color_preserve_ratio)
        for enhanced_bgr in processed_frames:
            out.write(enhanced_bgr)

        frame_count += len(resized_buffer)
        print(f"処理済みフレーム数: {frame_count}/{total_frames}")

        # バッファをクリア
        resized_buffer = []
        original_sizes = []

cap.release()
out.release()
print(f"処理完了: {output_path}")