リレーショナルデータベースのテーブルでリストを扱う例(SQLite 3, Python を使用)
リレーショナルデータベースのテーブルにおいて,リスト L = (e1, e2, ..., en) を {(1, e1), (2, e2), ..., (n, en)} の形式でマッピングする手法について詳しく解説する.このとき,リレーションスキーマは R(要素番号, 要素値)となる.複数のリスト {L1, L2, ..., Lk} をマッピングする場合は,リスト番号の属性を含めたリレーションスキーマ R(リスト番号, 要素番号, 要素値) を採用する.
- テストデータ生成用 Python プログラムの実装
- テストデータベースの構築手順
- エッジリストの効率的な生成方法
- 大規模テストデータベースの生成と性能評価
前準備
SQLite 3 の詳細説明: 別ページ »を参照
◆ テストデータ生成用 Python プログラム
CSV形式のテストデータを効率的に生成し,標準出力へ出力するプログラムの実装
◆ Python プログラム
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # usage: python hoge.py 10 5 import sys import random import string LEN = 8 c = 0 TOTAL_LIST_NUM = int(sys.argv[1]) LIST_LEN = int(sys.argv[2]) print("# id, list_num, item_num, x, y, price, name") for i in range(1, TOTAL_LIST_NUM + 1): for j in range(1, LIST_LEN + 1): id = c list_num = i item_num = j x = 100 * random.random() y = 100 * random.random() price = int(1000 * random.random()) + 1 name = ''.join(random.choices(string.ascii_letters + string.digits, k=LEN)) print(f"{id}, {list_num}, {item_num}, {x:.6f}, {y:.6f}, {price}, {name}") c += 1
◆ テストデータベースの生成手順
以下の手順で,テストデータベースを効率的に生成する.上記の Python プログラムを hoge.py として保存し,次のパラメータで実行する:
- データベースファイル名: /tmp/1.db
- テーブル名: dat
- リスト数: 10
- 各リストの要素数: 5
◆ bash プログラム
#!/bin/bash rm -f /tmp/1.csv python /tmp/hoge.py 10 5 | fgrep -v '#' > /tmp/1.csv rm -f /tmp/1.$$.sql cat >/tmp/1.$$.sql << SQL create table dat ( id integer primary key not null, list_num integer, item_num integer, x real, y real, price integer, name text ); .mode csv .import /tmp/1.csv dat .exit SQL rm -f /tmp/1.db cat /tmp/1.$$.sql | sqlite3 /tmp/1.db
データベースの確認方法:
sqliteman /tmp/1.db
◆ エッジリストの生成プロセス
ノードリストの集合 {L1, L2, ..., Lk} において,各集合 Li は一連のノードで構成される.これらのノードリスト集合をリレーショナルデータベースのテーブル dat に効率的にマッピングし,自己結合操作を実行する.この結果,各行が1つのエッジ(2つのノードを接続する辺)を表現する最適化されたテーブルが生成される.
◆ エッジリスト生成のためのSQL文
select A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num from dat A, dat B where A.item_num + 1 = B.item_num AND A.list_num = B.list_num;
テストデータベース生成
エッジリスト生成システムの信頼性と性能を確保するため,包括的な性能評価を実施する.以下のパラメータセットで評価を行う:
- データベースファイル名: /tmp/500000.db,/tmp/1000000.db,/tmp/2000000.db,/tmp/5000000.db
- テーブル名: dat
- リスト数: 500000,1000000,2000000,5000000
- 各リストの要素数: 5
◆ テストスクリプト(Python版):
#!/usr/bin/env python3 import os import sqlite3 import subprocess import sys import time def create_database(db_name): sql = ''' create table dat ( id integer primary key not null, list_num integer, item_num integer, x number, y number, price integer, name text ); ''' conn = sqlite3.connect(f'/tmp/{db_name}.db') conn.execute(sql) conn.close() def populate_database(db_name, list_num, list_len): # Generate CSV using Python script csv_file = f'/tmp/{db_name}.csv' subprocess.run([ 'python', '/tmp/hoge.py', str(list_num), str(list_len) ], stdout=open(csv_file, 'w')) # Import CSV to SQLite conn = sqlite3.connect(f'/tmp/{db_name}.db') with open(csv_file) as f: next(f) # Skip header conn.executemany( 'INSERT INTO dat VALUES (?,?,?,?,?,?,?)', [line.strip().split(',') for line in f] ) conn.commit() conn.close() def run_performance_test(db_name): print(f'Testing {db_name}.db') start_time = time.time() conn = sqlite3.connect(f'/tmp/{db_name}.db') conn.execute('DROP TABLE IF EXISTS T') conn.execute(''' CREATE TABLE T AS SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num FROM dat A, dat B WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num ''') count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0] conn.close() end_time = time.time() print(f'Count: {count}') print(f'Time taken: {end_time - start_time:.2f} seconds\n') def main(): test_sizes = [500000, 1000000, 2000000, 5000000] # Create and populate databases for size in test_sizes: create_database(str(size)) populate_database(str(size), size, 5) # Run performance tests for size in test_sizes: run_performance_test(str(size)) if __name__ == '__main__': main()
◆ 二次索引の適用と性能評価
システム性能を最適化するため,以下の Python スクリプトを使用して二次索引を実装し,包括的な性能評価を実施する:
#!/usr/bin/env python3 import sqlite3 import time def create_index_and_test(db_name): print(f'Testing {db_name}.db with index') # Create index conn = sqlite3.connect(f'/tmp/{db_name}.db') conn.execute('CREATE INDEX idx001 ON dat(item_num, list_num)') conn.close() # Run performance test start_time = time.time() conn = sqlite3.connect(f'/tmp/{db_name}.db') conn.execute('DROP TABLE IF EXISTS T') conn.execute(''' CREATE TABLE T AS SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num FROM dat A, dat B WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num ''') count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0] conn.close() end_time = time.time() print(f'Count: {count}') print(f'Time taken: {end_time - start_time:.2f} seconds\n') def main(): test_sizes = ['500000', '1000000', '2000000', '5000000'] for size in test_sizes: create_index_and_test(size) if __name__ == '__main__': main()