多言語対応文埋め込み技術を用いた意味的類似性による単語クラスタリング(ソースコードと実行結果)

【概要】 単語抽出 → 埋め込み → K-means → クラスタ表示を行う.Sentence Transformers/E5の利用により,文脈を考慮した意味理解(多義語対応: 文脈により異なる意味を区別可能)が可能.日英混在テキストでも統一的に処理可能.このプログラムでは,他のモデルとも比較できるようにしている

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install sentence-transformers beautifulsoup4 scikit-learn sudachipy sudachidict-core

多言語対応文埋め込み技術による文書分析・単語クラスタリングプログラム

概要

このプログラムは音声から周波数スペクトログラムを計算し、深層学習モデルCREPEを用いて基本周波数(F0)を推定する。音声の物理的特性を数値化し、時間変化を可視化する。

主要技術

参考文献

[1] Kim, J. W., Salamon, J., Li, P., & Bello, J. P. (2018). CREPE: A Convolutional Representation for Pitch Estimation. In 2018 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP) (pp. 161-165). IEEE.

[2] McFee, B., Raffel, C., Liang, D., Ellis, D. P., McVicar, M., Battenberg, E., & Nieto, O. (2015). librosa: Audio and music signal analysis in python. In Proceedings of the 14th python in science conference (Vol. 8, pp. 18-25).


# 多言語対応文埋め込み技術による文書分析・単語クラスタリングプログラム
# 特徴技術名: Sentence Transformers / E5 (多言語対応文埋め込み技術)
# 出典: Wang, L., et al. (2022). Text Embeddings by Weakly-Supervised Contrastive Pre-training. arXiv preprint arXiv:2212.03533. / Reimers, N., & Gurevych, I. (2020). Making Monolingual Sentence Embeddings Multilingual using Knowledge Distillation. EMNLP.
# 特徴機能: 多言語対応密な文埋め込み生成機能。E5モデルは1024次元のベクトルに変換し、94言語に対応。MTEBベンチマークで性能を実証。コサイン類似度による意味的類似性計算が可能
# 学習済みモデル: multilingual-e5-large。Microsoft製の多言語埋め込みモデル。94言語対応、1024次元の密なベクトル空間に文を変換。560Mパラメータ。URL: https://huggingface.co/intfloat/multilingual-e5-large
# 方式設計:
#   関連利用技術: SudachiPy(日本語形態素解析器、Cモード長単位分割使用)、scikit-learn K-means(教師なしクラスタリングアルゴリズム)、Beautiful Soup 4(HTMLパーサー)、urllib(HTTPリクエスト処理)、tkinter(GUIファイル選択ダイアログ)、Sentence Transformers/E5モデル(埋め込み技術)
#   入力と出力: 入力: URL(ユーザは「0:URL手入力,1:サンプルページ,2:ファイル選択」のメニューで選択。0:URL手入力の場合は手入力。1の場合はhttps://ja.wikipedia.org/wiki/機械学習を使用。2の場合はtkinterでファイル選択)、出力: 処理結果をprint()で表示。プログラム終了時にprint()で表示した処理結果をresult.txtファイルに保存し、「result.txtに保存」したことをprint()で表示
#   処理手順: URL/ファイルからコンテンツ取得→Beautiful SoupでHTMLパース→テキスト抽出と正規化→SudachiPy Cモードで形態素解析→名詞・動詞・形容詞を抽出→TF-IDF値計算→E5/Sentence Transformersで単語埋め込み生成→K-meansクラスタリング→クラスタ結果表示
#   前処理、後処理: 前処理: HTMLタグ除去、空白文字正規化、日本語・英数字以外の文字除去。後処理: クラスタごとにTF-IDF値でソート、サマリレポート生成
#   追加処理: エルボー法とシルエット分析による最適クラスタ数の自動決定。複数の埋め込みモデルから選択可能(E5-large/base、BGE-M3、従来モデル)。文字エンコーディング自動判定(UTF-8/Shift_JIS)
#   調整を必要とする設定値: 単語の最小文字数(1文字以上または2文字以上を選択)、埋め込みモデル(4種類から選択)
# 将来方策: 単語の最小文字数は実行時にユーザが選択可能。埋め込みモデルも実行時に4種類から選択可能として実装済み
# その他の重要事項: 日本語文字判定に正規表現使用(ひらがな・カタカナ・漢字・CJK統合漢字拡張)
# 前準備:
#   pip install sentence-transformers beautifulsoup4 scikit-learn sudachipy sudachidict-core

from collections import Counter
import urllib.request
import urllib.parse
import re
import tkinter as tk
from tkinter import filedialog
import os
from bs4 import BeautifulSoup
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import silhouette_score
from sudachipy import Dictionary, SplitMode

# 設定値
DEFAULT_N_CLUSTERS = 3  # デフォルトクラスタ数
MAX_CLUSTERS_FOR_ELBOW = 10  # エルボー法での最大クラスタ数
JAPANESE_CHAR_PATTERN = r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF\u3400-\u4DBF]'  # 日本語文字パターン
RANDOM_STATE = 42  # 乱数シード(再現性のため)


def normalize_text(text):
    """テキストの正規化処理"""
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\w\s\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF\u3400-\u4DBF]', ' ', text)
    return text.strip()


def read_file_content(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        if file_path.lower().endswith('.html'):
            soup = BeautifulSoup(content, 'html.parser')
            text = soup.get_text()
        else:
            text = content

        return normalize_text(text)
    except Exception as e:
        print(f'ファイルの読み込みに失敗しました: {file_path}')
        print(f'エラー: {e}')
        exit()


def download_and_extract_text(url):
    try:
        parsed_url = urllib.parse.urlparse(url)
        encoded_path = urllib.parse.quote(parsed_url.path.encode('utf-8'))
        encoded_url = urllib.parse.urlunparse((
            parsed_url.scheme,
            parsed_url.netloc,
            encoded_path,
            parsed_url.params,
            parsed_url.query,
            parsed_url.fragment
        ))

        with urllib.request.urlopen(encoded_url) as response:
            html_content = response.read()

        encoding = 'utf-8'
        try:
            html_content = html_content.decode(encoding)
        except UnicodeDecodeError:
            encoding = 'shift_jis'
            html_content = html_content.decode(encoding, errors='ignore')

        soup = BeautifulSoup(html_content, 'html.parser')
        text = soup.get_text()

        return normalize_text(text)
    except Exception as e:
        print(f'URLからのダウンロードに失敗しました: {url}')
        print(f'エラー: {e}')
        exit()


def extract_words(text):
    print('SudachiPy Cモード(長単位)で日本語を分析中...')

    print()
    print('単語の文字数設定を選択してください:')
    print('1: 1文字以上の単語をすべて処理(例:法、権、国、日本、天皇、憲法)')
    print('2: 2文字以上の単語のみを処理(例:日本、天皇、憲法、国政、政府)')

    length_choice = input('選択: ')
    min_length = 1 if length_choice == '1' else 2

    tokenizer_obj = Dictionary().create()
    split_mode = SplitMode.C

    words = []

    # 正規表現で単語候補を抽出
    word_cands = re.findall(r'\b\w+\b', text)

    for word in word_cands:
        if len(word) < min_length:
            continue

        # 日本語文字が含まれているかチェック
        if re.search(JAPANESE_CHAR_PATTERN, word):
            # 日本語の場合、SudachiPyで処理
            morphemes = tokenizer_obj.tokenize(word, split_mode)
            for m in morphemes:
                pos = m.part_of_speech()[0]
                if pos in ['名詞', '動詞', '形容詞'] and len(m.surface()) >= min_length:
                    words.append(m.surface())
        else:
            # 英語の場合、そのまま追加
            words.append(word)

    # TF-IDF値を計算
    word_counts = Counter(words)
    unique_words = list(word_counts.keys())
    word_text = ' '.join(words)

    vectorizer = TfidfVectorizer(vocabulary=unique_words)
    tfidf_matrix = vectorizer.fit_transform([word_text])
    feature_names = vectorizer.get_feature_names_out()

    word_tfidf = {}
    for i, word in enumerate(feature_names):
        word_tfidf[word] = tfidf_matrix[0, i]

    return word_tfidf


def perform_clustering(word_tfidf, n_clusters=DEFAULT_N_CLUSTERS):
    words = list(word_tfidf.keys())
    if len(words) < n_clusters:
        n_clusters = max(1, len(words))

    # モデル選択
    print()
    print('埋め込みモデルを選択してください:')
    print('0: multilingual-e5-large (1024次元, 94言語, 560Mパラメータ) [推奨]')
    print('1: multilingual-e5-base (768次元, 94言語, 278Mパラメータ)')
    print('2: bge-m3-retromae (1024次元, 100+言語, 568Mパラメータ, 8192トークン対応)')
    print('3: paraphrase-multilingual-MiniLM-L12-v2 (384次元, 50言語, 118Mパラメータ) [軽量]')

    model_choice = input('選択: ')
    models = {
        '0': 'intfloat/multilingual-e5-large',
        '1': 'intfloat/multilingual-e5-base',
        '2': 'BAAI/bge-m3-retromae',
        '3': 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'
    }
    model_name = models.get(model_choice, 'intfloat/multilingual-e5-large')

    try:
        model = SentenceTransformer(model_name)
        embeddings = model.encode(words)

        # エルボー法で最適クラスタ数を決定
        if len(words) >= 3:
            max_k = min(MAX_CLUSTERS_FOR_ELBOW, len(words))
            inertias = []
            for k in range(3, max_k + 1):
                kmeans_temp = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init='auto')
                kmeans_temp.fit(embeddings)
                inertias.append(kmeans_temp.inertia_)

            # エルボーポイントを検出
            if len(inertias) > 2:
                diffs = [inertias[i] - inertias[i+1] for i in range(len(inertias)-1)]
                second_diffs = [diffs[i] - diffs[i+1] for i in range(len(diffs)-1)]
                optimal_k = second_diffs.index(max(second_diffs)) + 4 if second_diffs else 3
                optimal_k = min(optimal_k, len(words))
            else:
                optimal_k = 3

            # シルエット分析で最適クラスタ数を決定
            sil_scores = []
            for k in range(2, max_k + 1):
                kmeans_temp = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init='auto')
                labels = kmeans_temp.fit_predict(embeddings)
                score = silhouette_score(embeddings, labels)
                sil_scores.append(score)

            sil_optimal = sil_scores.index(max(sil_scores)) + 2

            # 最終決定(エルボー法とシルエット分析の平均)
            n_clusters = int((optimal_k + sil_optimal) / 2)
            n_clusters = min(n_clusters, len(words))

            print(f'エルボー法最適値: {optimal_k}')
            print(f'シルエット分析最適値: {sil_optimal}')
            print(f'最適クラスタ数: {n_clusters}')
        else:
            print(f'最適クラスタ数: {n_clusters}')

        kmeans = KMeans(n_clusters=n_clusters, random_state=RANDOM_STATE, n_init='auto')
        cluster_labels = kmeans.fit_predict(embeddings)

        clusters = {}
        for word, label in zip(words, cluster_labels):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append((word, word_tfidf[word]))

        for cluster_id in clusters:
            clusters[cluster_id].sort(key=lambda x: x[1], reverse=True)

        return clusters, embeddings, cluster_labels
    except Exception as e:
        print(f'クラスタリング処理に失敗しました: {e}')
        exit()


def display_and_save_results(clusters, total_words):
    print()
    print('=== クラスタリング結果 ===')
    print(f'総単語数: {total_words}')
    print(f'クラスタ数: {len(clusters)}')
    print('※ カッコ内はTF-IDF値')
    print()

    for cluster_id, word_list in clusters.items():
        word_with_counts = ', '.join([f'{word}({tfidf:.2f})' for word, tfidf in word_list])
        print(f'クラスタ {cluster_id} ({len(word_list)}語): {word_with_counts}')
        print('----------------------------------------------')

    print()
    print('=== サマリレポート ===')
    sorted_clusters = sorted(clusters.items(), key=lambda x: len(x[1]), reverse=True)
    for cluster_id, word_list in sorted_clusters:
        top_words = [word for word, tfidf in word_list[:10]]
        word_summary = ', '.join(top_words)
        print(f'クラスタ{cluster_id} ({len(word_list)}語): {word_summary}')
    print()

    results = []
    results.append('=== 単語クラスタリング結果 ===\n')
    results.append(f'総単語数: {total_words}\n')
    results.append(f'クラスタ数: {len(clusters)}\n')
    results.append('※ カッコ内はTF-IDF値\n\n')

    for cluster_id, word_list in clusters.items():
        word_with_counts = ', '.join([f'{word}({tfidf:.2f})' for word, tfidf in word_list])
        results.append(f'クラスタ {cluster_id} ({len(word_list)}語): {word_with_counts}\n')
        results.append('----------------------------------------------\n')

    results.append('\n=== サマリレポート ===\n')
    sorted_clusters = sorted(clusters.items(), key=lambda x: len(x[1]), reverse=True)
    for cluster_id, word_list in sorted_clusters:
        top_words = [word for word, tfidf in word_list[:10]]
        word_summary = ', '.join(top_words)
        results.append(f'クラスタ{cluster_id} ({len(word_list)}語): {word_summary}\n')

    result_path = os.path.join('.', 'result.txt')
    with open(result_path, 'w', encoding='utf-8') as f:
        f.writelines(results)

    print('result.txtに保存しました')


# プログラム開始
print('=== URL文書分析・単語クラスタリングシステム ===')
print('HTMLページまたはファイルから単語を抽出し、意味的類似性によりクラスタリングします')
print()
print('【プログラムの概要】')
print('- Sentence Transformers/E5による多言語対応の単語埋め込み')
print('- K-meansクラスタリングによる単語の意味的分類')
print('- 結果はコンソール表示とresult.txtファイルに保存')
print()
print('【ユーザが行う操作】')
print('1. 入力方法の選択(URL手入力/サンプル/ファイル)')
print('2. 単語の最小文字数の選択(1文字以上/2文字以上)')
print('3. 埋め込みモデルの選択(4種類から選択)')
print()

print('0: URL手入力')
print('1: サンプルページ (https://ja.wikipedia.org/wiki/機械学習)')
print('2: ファイル選択')

choice = input('選択: ')

if choice == '0':
    url = input('URLを入力してください: ')
    print(f'URL処理中: {url}')
    text = download_and_extract_text(url)
elif choice == '1':
    url = 'https://ja.wikipedia.org/wiki/機械学習'
    print(f'URL処理中: {url}')
    text = download_and_extract_text(url)
elif choice == '2':
    root = tk.Tk()
    root.withdraw()
    file_path = filedialog.askopenfilename(
        title='ファイルを選択してください',
        filetypes=[('Text files', '*.txt'), ('HTML files', '*.html'), ('All files', '*.*')]
    )
    if not file_path:
        print('ファイルが選択されませんでした')
        exit()
    print(f'ファイル処理中: {file_path}')
    text = read_file_content(file_path)
else:
    print('無効な選択です')
    exit()

word_tfidf = extract_words(text)

if len(word_tfidf) == 0:
    print('有効な単語が見つかりませんでした')
    exit()

print(f'抽出された単語数: {len(word_tfidf)}')
clusters, embeddings, cluster_labels = perform_clustering(word_tfidf)

print()
print('=== 結果について ===')
print('【得られる結果】')
print('・クラスタ番号(0, 1, 2...)とそれに所属する単語の一覧')
print('・同じクラスタの単語は意味的に類似している単語群')
print('・異なるクラスタの単語は意味的に異なる単語群')
print()
print('【対象となる単語】')
print('・記事から抽出された単語(重複除去済み)')
print('・単語の重要度(TF-IDF値)を計算し表示')
print('・助詞や短い語は除外されるため、主に名詞・動詞・形容詞が対象')
print()
print('【算出方法】')
print('1. 各単語をSentence Transformers/E5で密なベクトルに変換')
print('2. ベクトル間の距離を計算して類似性を測定')
print('3. K-meansアルゴリズムで距離の近い単語をクラスタ化')
print('4. 同じクラスタ番号の単語は意味的に関連性が高いと判定')
print()

display_and_save_results(clusters, len(word_tfidf))