リレーショナルデータベースのテーブルでリストを扱う例(SQLite 3, Python を使用)

リレーショナルデータベースのテーブルにおいて,リスト L = (e1, e2, ..., en) を {(1, e1), (2, e2), ..., (n, en)} の形式でマッピングする手法について詳しく解説する.このとき,リレーションスキーマは R(要素番号, 要素値)となる.複数のリスト {L1, L2, ..., Lk} をマッピングする場合は,リスト番号の属性を含めたリレーションスキーマ R(リスト番号, 要素番号, 要素値) を採用する.

前準備

SQLite 3 の詳細説明: 別ページ »を参照

◆ テストデータ生成用 Python プログラム

CSV形式のテストデータを効率的に生成し,標準出力へ出力するプログラムの実装

◆ Python プログラム

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# usage: python hoge.py 10 5

import sys
import random
import string

LEN = 8
c = 0
TOTAL_LIST_NUM = int(sys.argv[1])
LIST_LEN = int(sys.argv[2])

print("# id, list_num, item_num, x, y, price, name")

for i in range(1, TOTAL_LIST_NUM + 1):
    for j in range(1, LIST_LEN + 1):
        id = c
        list_num = i
        item_num = j
        x = 100 * random.random()
        y = 100 * random.random()
        price = int(1000 * random.random()) + 1
        name = ''.join(random.choices(string.ascii_letters + string.digits, k=LEN))

        print(f"{id}, {list_num}, {item_num}, {x:.6f}, {y:.6f}, {price}, {name}")

        c += 1

テストデータベースの生成手順

以下の手順で,テストデータベースを効率的に生成する.上記の Python プログラムを hoge.py として保存し,次のパラメータで実行する:

◆ bash プログラム

#!/bin/bash
rm -f /tmp/1.csv
python /tmp/hoge.py 10 5 | fgrep -v '#' > /tmp/1.csv

rm -f /tmp/1.$$.sql
cat >/tmp/1.$$.sql << SQL
create table dat (
  id       integer primary key not null,
  list_num integer,
  item_num integer,
  x        real,
  y        real,
  price    integer,
  name     text );
.mode csv
.import /tmp/1.csv dat
.exit
SQL

rm -f /tmp/1.db
cat /tmp/1.$$.sql | sqlite3 /tmp/1.db

データベースの確認方法:

sqliteman /tmp/1.db

◆ エッジリストの生成プロセス

ノードリストの集合 {L1, L2, ..., Lk} において,各集合 Li は一連のノードで構成される.これらのノードリスト集合をリレーショナルデータベースのテーブル dat に効率的にマッピングし,自己結合操作を実行する.この結果,各行が1つのエッジ(2つのノードを接続する辺)を表現する最適化されたテーブルが生成される.

エッジリスト生成のためのSQL文

select A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num from dat A, dat B where A.item_num + 1 = B.item_num AND A.list_num = B.list_num;

テストデータベース生成

エッジリスト生成システムの信頼性と性能を確保するため,包括的な性能評価を実施する.以下のパラメータセットで評価を行う:

◆ テストスクリプト(Python版):

#!/usr/bin/env python3
import os
import sqlite3
import subprocess
import sys
import time

def create_database(db_name):
    sql = '''
    create table dat (
        id       integer primary key not null,
        list_num integer,
        item_num integer,
        x        number,
        y        number,
        price    integer,
        name     text
    );
    '''
    conn = sqlite3.connect(f'/tmp/{db_name}.db')
    conn.execute(sql)
    conn.close()

def populate_database(db_name, list_num, list_len):
    # Generate CSV using Python script
    csv_file = f'/tmp/{db_name}.csv'
    subprocess.run([
        'python', '/tmp/hoge.py', str(list_num), str(list_len)
    ], stdout=open(csv_file, 'w'))

    # Import CSV to SQLite
    conn = sqlite3.connect(f'/tmp/{db_name}.db')
    with open(csv_file) as f:
        next(f)  # Skip header
        conn.executemany(
            'INSERT INTO dat VALUES (?,?,?,?,?,?,?)',
            [line.strip().split(',') for line in f]
        )
    conn.commit()
    conn.close()

def run_performance_test(db_name):
    print(f'Testing {db_name}.db')
    start_time = time.time()

    conn = sqlite3.connect(f'/tmp/{db_name}.db')
    conn.execute('DROP TABLE IF EXISTS T')
    conn.execute('''
        CREATE TABLE T AS
        SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num
        FROM dat A, dat B
        WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num
    ''')
    count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0]
    conn.close()

    end_time = time.time()
    print(f'Count: {count}')
    print(f'Time taken: {end_time - start_time:.2f} seconds\n')

def main():
    test_sizes = [500000, 1000000, 2000000, 5000000]

    # Create and populate databases
    for size in test_sizes:
        create_database(str(size))
        populate_database(str(size), size, 5)

    # Run performance tests
    for size in test_sizes:
        run_performance_test(str(size))

if __name__ == '__main__':
    main()

◆ 二次索引の適用と性能評価

システム性能を最適化するため,以下の Python スクリプトを使用して二次索引を実装し,包括的な性能評価を実施する:

#!/usr/bin/env python3
import sqlite3
import time

def create_index_and_test(db_name):
    print(f'Testing {db_name}.db with index')

    # Create index
    conn = sqlite3.connect(f'/tmp/{db_name}.db')
    conn.execute('CREATE INDEX idx001 ON dat(item_num, list_num)')
    conn.close()

    # Run performance test
    start_time = time.time()

    conn = sqlite3.connect(f'/tmp/{db_name}.db')
    conn.execute('DROP TABLE IF EXISTS T')
    conn.execute('''
        CREATE TABLE T AS
        SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num
        FROM dat A, dat B
        WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num
    ''')
    count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0]
    conn.close()

    end_time = time.time()
    print(f'Count: {count}')
    print(f'Time taken: {end_time - start_time:.2f} seconds\n')

def main():
    test_sizes = ['500000', '1000000', '2000000', '5000000']
    for size in test_sizes:
        create_index_and_test(size)

if __name__ == '__main__':
    main()