pythonでDB処理(MariaDB)
更新日:2022年1月16日
前提
pythonからMariaDBにアクセスする処理について記載します。
使用するバージョンは、これを記載している時点での最新を使用します。
python3.10
MariaDB10.6.5
MariaDB Connector/Python(1.0)
MariaDB Connector/Pythonについては、以下にドキュメントがあります。
MariaDBの各種言語向けのコネクタドキュメント:https://mariadb.com/docs/clients/mariadb-connectors/
python用コネクタのドキュメントトップページ:https://mariadb.com/docs/clients/mariadb-connectors/connector-python/
python用のコネクタのAPIリファレンス:https://mariadb.com/docs/reference/conpy/api/
サンプルコードでは、MariaDB上に、「testdb」というデータベースを作成し、そこに以下のテーブルを作成したものに、処理を行っています。
各テーブルに、特に深い意味は無く、なんとなく作ったものです。
create table Item(
code varchar(32),
itemName varchar(64),
createTime timestamp,
PRIMARY KEY(code)
);
create table warehouse(
code varchar(16),
wareHouseName varchar(256),
PRIMARY KEY(code)
);
create table Stock(
itemCode varchar(32),
warehouseCode varchar(16),
quantity int,
lastUpdate timestamp,
PRIMARY KEY(itemCode, warehouseCode),
FOREIGN KEY fk_Stock_itemCode(itemCode) REFERENCES Item(code),
FOREIGN KEY fk_Stock_warehouse(warehouseCode) REFERENCES warehouse(code)
);
create table StockJournal(
id bigint auto_increment,
itemCode varchar(32),
warehouseCode varchar(16),
quantity int,
journalDate timestamp,
PRIMARY KEY(id),
FOREIGN KEY fk_StockJournal_itemCode(itemCode) REFERENCES Stock(itemCode),
FOREIGN KEY fk_StockJournal_warehouse(warehouseCode) REFERENCES Stock(warehouseCode)
);
CREATE VIEW V_ITEM_STOCK(
itemCode,
itemName,
warehouseCode,
warehouseName,
quantity
) AS
SELECT
a.code,
a.itemName,
c.code,
c.warehouseName,
b.quantity
FROM item a
INNER JOIN stock b
ON a.code = b.itemCode
INNER JOIN warehouse c
ON b.warehouseCode = c.code
ORDER BY a.code, c.code;
環境構築
pythonからMariaDBにアクセスするには、ドライバが必要です(どのDBでも同じですが)。
MariaDBについては、以下に公式からドキュメントが提供されていますので、これに沿って作業すればOKです。
→ https://mariadb.com/ja/resources/blog/how-to-connect-python-programs-to-mariadb/
コマンドプロンプトにて、以下のコマンドを実行することで、追加されます。
pip.exe install mariadb
※pipのコマンドへパスが通ってない場合は、pip.exeのパスを絶対パスで指定してあげればOK。
デフォルトでは以下のパスにあるはずです。
C:\Users\(ユーザー名)\AppData\Local\Programs\Python\Python310\Scripts\pip.exe
SELECTの基本
WHERE句無しと、有りでSELECT文を実行しています。
ポイントとしては、WHERE句のところで、「?」というプレースホルダを使用しています。
このように記載することで、ドライバ側でサニタイジングしてくれるとのことです。
SQLインジェクション対策としては、この書き方にしていればOKな様子。
また、「print("実行したSQL:", cur.statement)」で実行したSQLの表示を行っていますが、これはプレースホルダがバインドされた値に置き換わっていない状態(つまり「?」のまま)となります。
置き換わった状態のSQLを表示するのは無理な様子です(クエリを送った後、パラメータについてはバイナリで送るからとかなんとか・・・。詳細は良く知りません。)。
import mariadb
# DBのコネクションを返す
def createConnection():
try:
conn = mariadb.connect(
user='root', # MariaDBのユーザーID
password='password', # MariaDBのrootユーザーのパスワード
host='127.0.0.1', # MariaDBのサーバーアドレス
port=3306, # MariaDBのポート番号
database='testdb' # デフォルトで使用するDB
)
except mariadb.Error as e:
print(f"DB接続エラー:{e}")
return None
return conn
# 全レコードをSELECTを実行する
def execSelectAll(conn):
sql = 'SELECT code, itemName, createTime FROM item'
cur = conn.cursor()
try:
# SQLを実行する
cur.execute(sql)
# SELECT結果を取得する
for record in cur:
print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])
finally:
cur.close()
# 条件を指定してSELECTを実行する
def execSelectWhere(conn, code):
# プレースホルダ(「?」の部分)を設けたSQL
sql = 'SELECT code, itemName, createTime FROM item WHERE code = ?'
# 上記のプレースホルダーにセットするタプル
param = (code,) # 要素が1個しかない場合、末尾の「,」を忘れずにつけること
cur = conn.cursor()
try:
# SQLを実行する
cur.execute(sql, param)
print("実行したSQL:", cur.statement)
# SELECT結果を取得する
for record in cur:
print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])
finally:
cur.close()
# 処理を実行する
def execSQL():
# DBのコネクションを作成
conn = createConnection()
if conn == None:
return
try:
print("■全件をselect")
execSelectAll(conn)
print("■where句を追加してselect")
execSelectWhere(conn, 'ITM0001')
except mariadb.Error as e:
print(f"DBエラー:{e}")
finally:
conn.close()
# execSQLを実行
execSQL()
実行結果
■全件をselect code= ITM0001 itemName= 柔らかい石 createTime= 2022-01-09 19:24:36 code= ITM0002 itemName= 硬い石 createTime= 2022-01-09 19:24:36 code= ITM0003 itemName= 不思議な石 createTime= 2022-01-09 19:24:36 ■where句を追加してselect 実行したSQL: SELECT code, itemName, createTime FROM item WHERE code = ? code= ITM0001 itemName= 柔らかい石 createTime= 2022-01-09 19:24:36
SELECT結果からテーブル情報を取得する
SELECTした結果から、カラム名や型の情報が取得できます。
カーソルのdescriptionから、カラムの情報が取得でき、配列になっています。
以下のデータの並びになっている様子です。
配列のindex | 格納されている値 |
0 | name:カラム名 |
1 | type_code:カラムの型コード(mariadb.fieldinfo().type関数で、型名に変換可能)。またこの値は、定数mariadb.constants.FIELD_TYPEに定義されている。 |
2 | display_size:カラムのサイズ。varchar(32)なら、32がセットされている様子。 |
3 | internal_size:何のサイズか不明・・・。 |
4 | precision:数値型とかの精度のことかな・・・? |
5 | scale:数値型とかの精度のことかな・・・? |
6 | null_ok:not null制約があるかないかをTrue/Flaseで返している様子。 |
8 | field_flags:フィールドのフラグ情報。定数mariadb.constants.FIELD_FLAGがあるはずなんだけど、見つからず・・・。なので、謎。 |
import mariadb
import mariadb.constants # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる
# DBのコネクションを返す
def createConnection():
"""
DB接続オブジェクトを返す.
"""
try:
conn = mariadb.connect(
user='root', # MariaDBのユーザーID
password='password', # MariaDBのrootユーザーのパスワード
host='127.0.0.1', # MariaDBのサーバーアドレス
port=3306, # MariaDBのポート番号
database='testdb' # デフォルトで使用するDB
)
except mariadb.Error as e:
print(f"DB接続エラー:{e}")
return None
return conn
# selectした結果のと、テーブル情報を取得する
def execSelect(conn):
cur = conn.cursor()
# SQLを実行する
cur.execute('SELECT code, itemName, createTime FROM item')
# カラムの型のIDから、カラムの型名に変換してくれるオブジェクトを取得
fInfo = mariadb.fieldinfo()
print("■SELECT結果のカラム情報")
# 1カラム毎の情報を表示する
for fieldDesc in cur.description:
# 1つのカラムが持っている情報を表示する
for desc in fieldDesc:
print(desc, end=',')
# カラムの型を表示
print("")
print(' カラムの型:', fInfo.type(fieldDesc))
print(' カラムのフラグ情報:', fInfo.flag(fieldDesc))
print("■SELECTしたレコードの情報")
# SELECT結果を取得する
for record in cur:
print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])
# 処理を実行する
def execSQL():
# DBのコネクションを作成
conn = createConnection()
if conn == None:
return
try:
execSelect(conn)
except mariadb.Error as e:
print(f"DBエラー:{e}")
finally:
conn.close()
# execSQLを実行
execSQL()
実行結果
■SELECT結果のカラム情報 code,253,32,128,0,0,False,20611, カラムの型: VAR_STRING カラムのフラグ情報: NOT_NULL | PRIMARY_KEY | BINARY | NO_DEFAULT itemName,253,64,256,0,0,True,128, カラムの型: VAR_STRING カラムのフラグ情報: BINARY createTime,7,4,19,0,0,False,9377, カラムの型: TIMESTAMP カラムのフラグ情報: NOT_NULL | UNSIGNED | BINARY | TIMESTAMP | UPDATE_TIMESTAMP ■SELECTしたレコードの情報 code= ITM0001 itemName= 柔らかい石 createTime= 2022-01-09 19:24:36 code= ITM0002 itemName= 硬い石 createTime= 2022-01-09 19:24:36 code= ITM0003 itemName= 不思議な石 createTime= 2022-01-09 19:24:36
INSERT/UPDATE/DELETEを実行する(autocommit)
トランザクションは使用せず、autocommitでINSERT/UPDATE/DELETEをやってみたいと思います。
DB接続を行う際、「autocommit=True」のオプションを追加するだけでOKです。
SQL実行後、即座に結果が反映されます。
また、autocommitの話とは別ですが、cursorのexecuteメソッドとexecutemanyメソッドの使い方についても記載しています。
import mariadb
import mariadb.constants # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる
# DBのコネクションを返す
def createConnection():
"""
DB接続オブジェクトを返す.
"""
try:
conn = mariadb.connect(
user='root', # MariaDBのユーザーID
password='password', # MariaDBのrootユーザーのパスワード
host='127.0.0.1', # MariaDBのサーバーアドレス
port=3306, # MariaDBのポート番号
database='testdb', # デフォルトで使用するDB
autocommit=True # autocommitを使用する(デフォルトではFalse)
)
except mariadb.Error as e:
print(f"DB接続エラー:{e}")
return None
return conn
# insertを実行
def execInsert(conn):
sql = 'INSERT INTO StockJournal(itemCode, warehouseCode, quantity, journalDate) VALUES(?, ?, ?, CURRENT_TIME)'
param1 = ('ITM0001', 'T1', 100)
param2 = [
('ITM0001', 'T1', 100),
('ITM0002', 'T1', 50),
('ITM0003', 'C1', 200),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"insertの処理に失敗しました:{e}")
finally:
cur.close()
# updateを実行
def execUpdate(conn):
sql = 'UPDATE Item SET itemName= ? WHERE code = ?'
param1 = ('硬い石new', 'ITM0002')
param2 = [
('柔らかい石new', 'ITM0001'),
('不思議な石new', 'ITM0003'),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"updateの処理に失敗しました:{e}")
finally:
cur.close()
# deleteを実行
def execDelete(conn):
sql = 'DELETE FROM StockJournal WHERE itemCode = ?'
param1 = ('ITM0001',)
param2 = [
('ITM0002',),
('ITM0003',),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"deleteの処理に失敗しました:{e}")
finally:
cur.close()
# 処理を実行する
def execSQL():
# DBのコネクションを作成
conn = createConnection()
if conn == None:
return
try:
execInsert(conn)
execUpdate(conn)
execDelete(conn)
except mariadb.Error as e:
print(f"DBエラー:{e}")
finally:
conn.close()
# execCRUDを実行
execSQL()
トランザクションを使用する
DBとのコネクションを作成する際、オプションでautocommitを有効にするかどうかを指定するオプションがあります。
ここにFalseを指定することで、トランザクションを使えるようになります。
以下に例を示します。
import mariadb
import mariadb.constants # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる
# DBのコネクションを返す
def createConnection():
"""
DB接続オブジェクトを返す.
"""
try:
conn = mariadb.connect(
user='root', # MariaDBのユーザーID
password='password', # MariaDBのrootユーザーのパスワード
host='127.0.0.1', # MariaDBのサーバーアドレス
port=3306, # MariaDBのポート番号
database='testdb', # デフォルトで使用するDB
autocommit=False # autocommitを無効にする(デフォルトで無効になっているが念のため)
)
except mariadb.Error as e:
print(f"DB接続エラー:{e}")
return None
return conn
# insertを実行
def execInsert(conn):
sql = 'INSERT INTO StockJournal(itemCode, warehouseCode, quantity, journalDate) VALUES(?, ?, ?, CURRENT_TIME)'
param1 = ('ITM0001', 'T1', 100)
param2 = [
('ITM0001', 'T1', 100),
('ITM0002', 'T1', 50),
('ITM0003', 'C1', 200),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"insertの処理に失敗しました:{e}")
finally:
cur.close()
# updateを実行
def execUpdate(conn):
sql = 'UPDATE Item SET itemName= ? WHERE code = ?'
param1 = ('硬い石new', 'ITM0002')
param2 = [
('柔らかい石new', 'ITM0001'),
('不思議な石new', 'ITM0003'),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"updateの処理に失敗しました:{e}")
finally:
cur.close()
# deleteを実行
def execDelete(conn):
sql = 'DELETE FROM StockJournal WHERE itemCode = ?'
param1 = ('ITM0001',)
param2 = [
('ITM0002',),
('ITM0003',),
]
cur = conn.cursor()
try:
# 1レコードのみinsert
cur.execute(sql, param1)
# 複数レコードinsert
cur.executemany(sql, param2)
except Exception as e:
print(f"deleteの処理に失敗しました:{e}")
finally:
cur.close()
# 処理を実行する
def execSQL():
# DBのコネクションを作成
conn = createConnection()
if conn == None:
return
try:
execInsert(conn)
execUpdate(conn)
# commitを実行する
conn.commit()
execDelete(conn)
# commitを実行する
conn.commit()
except mariadb.Error as e:
print(f"DBエラー:{e}")
# エラーが発生している場合rollbackを実行する
conn.rollback()
finally:
conn.close()
# execCRUDを実行
execSQL()
カーソルを使用する
SELECTした結果をフェッチする処理の例です。
executeでクエリを実行した後、fetchmanyメソッドで指定した数のレコード(サンプルでは100件)を取得しています。
import mariadb
# DBのコネクションを返す
def createConnection():
try:
conn = mariadb.connect(
user='TestUser1', # MariaDBのユーザーID
password='password', # MariaDBのrootユーザーのパスワード
host='192.168.11.50', # MariaDBのサーバーアドレス
port=3306, # MariaDBのポート番号
database='TestDB', # デフォルトで使用するDB
)
except mariadb.Error as e:
print(f"DB接続エラー:{e}")
return None
return conn
# selectを実行
def execSelectFetch(conn):
sql = 'SELECT id, itemCode, warehouseCode, quantity, journalDate FROM StockJournal ORDER BY ID'
cur = conn.cursor()
try:
# SQLを実行する
cur.execute(sql)
print("実行したSQL:", cur.statement)
counter = 0
# SELECT結果を取得する
while True:
# 引数で指定した数だけ、レコードを取得する
recordList = cur.fetchmany(100)
# 取得したレコードが0件の場合ループを抜ける
if not recordList:
break;
# なんとなく1,000,000件に1回ぐらいはレコードの情報を表示してみる
if counter % 10000 == 0:
# 100件フェッチしている中から1番目のデータを取得して、表示
record = recordList[0]
print("id:",record[0], "itemcode:", record[1], "warehouseCode:", record[2])
counter += 1;
finally:
cur.close()
# 処理を実行する
def execSQL():
# DBのコネクションを作成
conn = createConnection()
if conn == None:
return
try:
execSelectFetch(conn)
except mariadb.Error as e:
print(f"DBエラー:{e}")
finally:
conn.close()
# execCRUDを実行
execSQL()