pythonでDB処理(MariaDB)

更新日:2022年1月16日

前提

pythonからMariaDBにアクセスする処理について記載します。
使用するバージョンは、これを記載している時点での最新を使用します。
python3.10
MariaDB10.6.5
MariaDB Connector/Python(1.0)

MariaDB Connector/Pythonについては、以下にドキュメントがあります。
MariaDBの各種言語向けのコネクタドキュメント:https://mariadb.com/docs/clients/mariadb-connectors/
python用コネクタのドキュメントトップページ:https://mariadb.com/docs/clients/mariadb-connectors/connector-python/
python用のコネクタのAPIリファレンス:https://mariadb.com/docs/reference/conpy/api/

サンプルコードでは、MariaDB上に、「testdb」というデータベースを作成し、そこに以下のテーブルを作成したものに、処理を行っています。
各テーブルに、特に深い意味は無く、なんとなく作ったものです。


create table Item(
	code varchar(32),
	itemName varchar(64),
	createTime timestamp,

	PRIMARY KEY(code)
);

create table warehouse(
	code varchar(16),
	wareHouseName varchar(256),

	PRIMARY KEY(code)
);

create table Stock(
	itemCode varchar(32),
	warehouseCode varchar(16),
	quantity int,
	lastUpdate timestamp,

	PRIMARY KEY(itemCode, warehouseCode),
	FOREIGN KEY fk_Stock_itemCode(itemCode) REFERENCES Item(code),
	FOREIGN KEY fk_Stock_warehouse(warehouseCode) REFERENCES warehouse(code)
);

create table StockJournal(
    id bigint auto_increment,
    itemCode varchar(32),
    warehouseCode varchar(16),

    quantity int,
    journalDate timestamp,

    PRIMARY KEY(id),
    FOREIGN KEY fk_StockJournal_itemCode(itemCode) REFERENCES Stock(itemCode),
    FOREIGN KEY fk_StockJournal_warehouse(warehouseCode) REFERENCES Stock(warehouseCode)
);

CREATE VIEW V_ITEM_STOCK(
	itemCode,
	itemName,
	warehouseCode,
	warehouseName,
	quantity
) AS
	SELECT 
		a.code,
		a.itemName,
		c.code,
		c.warehouseName,
		b.quantity
	FROM item a
	INNER JOIN stock b
	ON a.code = b.itemCode
	INNER JOIN warehouse c
	ON b.warehouseCode = c.code
	ORDER BY a.code, c.code;

環境構築

pythonからMariaDBにアクセスするには、ドライバが必要です(どのDBでも同じですが)。
MariaDBについては、以下に公式からドキュメントが提供されていますので、これに沿って作業すればOKです。
 → https://mariadb.com/ja/resources/blog/how-to-connect-python-programs-to-mariadb/

コマンドプロンプトにて、以下のコマンドを実行することで、追加されます。

pip.exe install mariadb

※pipのコマンドへパスが通ってない場合は、pip.exeのパスを絶対パスで指定してあげればOK。
デフォルトでは以下のパスにあるはずです。
C:\Users\(ユーザー名)\AppData\Local\Programs\Python\Python310\Scripts\pip.exe

SELECTの基本

WHERE句無しと、有りでSELECT文を実行しています。
ポイントとしては、WHERE句のところで、「?」というプレースホルダを使用しています。
このように記載することで、ドライバ側でサニタイジングしてくれるとのことです。
SQLインジェクション対策としては、この書き方にしていればOKな様子。

また、「print("実行したSQL:", cur.statement)」で実行したSQLの表示を行っていますが、これはプレースホルダがバインドされた値に置き換わっていない状態(つまり「?」のまま)となります。
置き換わった状態のSQLを表示するのは無理な様子です(クエリを送った後、パラメータについてはバイナリで送るからとかなんとか・・・。詳細は良く知りません。)。


import mariadb

# DBのコネクションを返す
def createConnection():
    try:
        conn = mariadb.connect(
            user='root',            # MariaDBのユーザーID
            password='password',    # MariaDBのrootユーザーのパスワード
            host='127.0.0.1',       # MariaDBのサーバーアドレス
            port=3306,              # MariaDBのポート番号
            database='testdb'       # デフォルトで使用するDB
        )
    except mariadb.Error as e:
        print(f"DB接続エラー:{e}")
        return None
    
    return conn

# 全レコードをSELECTを実行する
def execSelectAll(conn):

    sql = 'SELECT code, itemName, createTime FROM item'

    cur = conn.cursor()
    try:
        # SQLを実行する
        cur.execute(sql)
        # SELECT結果を取得する
        for record in cur:
            print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])
    finally:
        cur.close()

# 条件を指定してSELECTを実行する
def execSelectWhere(conn, code):
    # プレースホルダ(「?」の部分)を設けたSQL
    sql = 'SELECT code, itemName, createTime FROM item WHERE code = ?'
    # 上記のプレースホルダーにセットするタプル
    param = (code,) # 要素が1個しかない場合、末尾の「,」を忘れずにつけること

    cur = conn.cursor()
    try:
        # SQLを実行する
        cur.execute(sql, param)
        print("実行したSQL:", cur.statement)
        # SELECT結果を取得する
        for record in cur:
            print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])
    finally:
        cur.close()

# 処理を実行する
def execSQL():
    # DBのコネクションを作成
    conn = createConnection()
    if conn == None:
        return
    
    try:
        print("■全件をselect")
        execSelectAll(conn)
        print("■where句を追加してselect")
        execSelectWhere(conn, 'ITM0001')

    except mariadb.Error as e:
        print(f"DBエラー:{e}")
    finally:
        conn.close()


# execSQLを実行
execSQL()

実行結果

■全件をselect
code= ITM0001  itemName= 柔らかい石  createTime= 2022-01-09 19:24:36
code= ITM0002  itemName= 硬い石  createTime= 2022-01-09 19:24:36
code= ITM0003  itemName= 不思議な石  createTime= 2022-01-09 19:24:36
■where句を追加してselect
実行したSQL: SELECT code, itemName, createTime FROM item WHERE code = ?
code= ITM0001  itemName= 柔らかい石  createTime= 2022-01-09 19:24:36

SELECT結果からテーブル情報を取得する

SELECTした結果から、カラム名や型の情報が取得できます。
カーソルのdescriptionから、カラムの情報が取得でき、配列になっています。
以下のデータの並びになっている様子です。

配列のindex 格納されている値
0 name:カラム名
1 type_code:カラムの型コード(mariadb.fieldinfo().type関数で、型名に変換可能)。またこの値は、定数mariadb.constants.FIELD_TYPEに定義されている。
2 display_size:カラムのサイズ。varchar(32)なら、32がセットされている様子。
3 internal_size:何のサイズか不明・・・。
4 precision:数値型とかの精度のことかな・・・?
5 scale:数値型とかの精度のことかな・・・?
6 null_ok:not null制約があるかないかをTrue/Flaseで返している様子。
8 field_flags:フィールドのフラグ情報。定数mariadb.constants.FIELD_FLAGがあるはずなんだけど、見つからず・・・。なので、謎。

import mariadb
import mariadb.constants    # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる

# DBのコネクションを返す
def createConnection():
    """
    DB接続オブジェクトを返す.
    """
    try:
        conn = mariadb.connect(
            user='root',            # MariaDBのユーザーID
            password='password',    # MariaDBのrootユーザーのパスワード
            host='127.0.0.1',       # MariaDBのサーバーアドレス
            port=3306,              # MariaDBのポート番号
            database='testdb'       # デフォルトで使用するDB
        )
    except mariadb.Error as e:
        print(f"DB接続エラー:{e}")
        return None
    
    return conn

# selectした結果のと、テーブル情報を取得する
def execSelect(conn):
    cur = conn.cursor()

    # SQLを実行する
    cur.execute('SELECT code, itemName, createTime FROM item')

    # カラムの型のIDから、カラムの型名に変換してくれるオブジェクトを取得
    fInfo = mariadb.fieldinfo()

    print("■SELECT結果のカラム情報")
    # 1カラム毎の情報を表示する
    for fieldDesc in cur.description:
        # 1つのカラムが持っている情報を表示する
        for desc in fieldDesc:
            print(desc, end=',')
        
        # カラムの型を表示
        print("")
        print('  カラムの型:', fInfo.type(fieldDesc))
        print('  カラムのフラグ情報:', fInfo.flag(fieldDesc))
    
    print("■SELECTしたレコードの情報")
    # SELECT結果を取得する
    for record in cur:
        print('code=', record[0], ' itemName=', record[1], ' createTime=', record[2])

# 処理を実行する
def execSQL():
    # DBのコネクションを作成
    conn = createConnection()
    if conn == None:
        return
    
    try:
        execSelect(conn)
    except mariadb.Error as e:
        print(f"DBエラー:{e}")
    finally:
        conn.close()


# execSQLを実行
execSQL()

実行結果

■SELECT結果のカラム情報
code,253,32,128,0,0,False,20611,
  カラムの型: VAR_STRING
  カラムのフラグ情報: NOT_NULL | PRIMARY_KEY | BINARY | NO_DEFAULT
itemName,253,64,256,0,0,True,128,
  カラムの型: VAR_STRING
  カラムのフラグ情報: BINARY
createTime,7,4,19,0,0,False,9377,
  カラムの型: TIMESTAMP
  カラムのフラグ情報: NOT_NULL | UNSIGNED | BINARY | TIMESTAMP | UPDATE_TIMESTAMP
■SELECTしたレコードの情報
code= ITM0001  itemName= 柔らかい石  createTime= 2022-01-09 19:24:36
code= ITM0002  itemName= 硬い石  createTime= 2022-01-09 19:24:36
code= ITM0003  itemName= 不思議な石  createTime= 2022-01-09 19:24:36

INSERT/UPDATE/DELETEを実行する(autocommit)

トランザクションは使用せず、autocommitでINSERT/UPDATE/DELETEをやってみたいと思います。
DB接続を行う際、「autocommit=True」のオプションを追加するだけでOKです。
SQL実行後、即座に結果が反映されます。
また、autocommitの話とは別ですが、cursorのexecuteメソッドとexecutemanyメソッドの使い方についても記載しています。


import mariadb
import mariadb.constants    # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる

# DBのコネクションを返す
def createConnection():
    """
    DB接続オブジェクトを返す.
    """
    try:
        conn = mariadb.connect(
            user='root',            # MariaDBのユーザーID
            password='password',    # MariaDBのrootユーザーのパスワード
            host='127.0.0.1',       # MariaDBのサーバーアドレス
            port=3306,              # MariaDBのポート番号
            database='testdb',      # デフォルトで使用するDB
            autocommit=True         # autocommitを使用する(デフォルトではFalse)
        )
    except mariadb.Error as e:
        print(f"DB接続エラー:{e}")
        return None
    
    return conn

# insertを実行
def execInsert(conn):
    sql = 'INSERT INTO StockJournal(itemCode, warehouseCode, quantity, journalDate) VALUES(?, ?, ?, CURRENT_TIME)'
    param1 = ('ITM0001', 'T1', 100)
    param2 = [
        ('ITM0001', 'T1', 100),
        ('ITM0002', 'T1', 50),
        ('ITM0003', 'C1', 200),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"insertの処理に失敗しました:{e}")
    finally:
        cur.close()

# updateを実行
def execUpdate(conn):
    sql = 'UPDATE Item SET itemName= ? WHERE code = ?'
    param1 = ('硬い石new', 'ITM0002')
    param2 = [
        ('柔らかい石new', 'ITM0001'),
        ('不思議な石new', 'ITM0003'),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"updateの処理に失敗しました:{e}")
    finally:
        cur.close()

# deleteを実行
def execDelete(conn):
    sql = 'DELETE FROM StockJournal WHERE itemCode = ?'
    param1 = ('ITM0001',)
    param2 = [
        ('ITM0002',),
        ('ITM0003',),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"deleteの処理に失敗しました:{e}")
    finally:
        cur.close()


# 処理を実行する
def execSQL():
    # DBのコネクションを作成
    conn = createConnection()
    if conn == None:
        return
    
    try:
        execInsert(conn)
        execUpdate(conn)
        execDelete(conn)
    except mariadb.Error as e:
        print(f"DBエラー:{e}")
    finally:
        conn.close()


# execCRUDを実行
execSQL()

トランザクションを使用する

DBとのコネクションを作成する際、オプションでautocommitを有効にするかどうかを指定するオプションがあります。
ここにFalseを指定することで、トランザクションを使えるようになります。

以下に例を示します。


import mariadb
import mariadb.constants    # ソース中では使用していないが、「FIELD_TYPE」などが使えるようになる

# DBのコネクションを返す
def createConnection():
    """
    DB接続オブジェクトを返す.
    """
    try:
        conn = mariadb.connect(
            user='root',            # MariaDBのユーザーID
            password='password',    # MariaDBのrootユーザーのパスワード
            host='127.0.0.1',       # MariaDBのサーバーアドレス
            port=3306,              # MariaDBのポート番号
            database='testdb',      # デフォルトで使用するDB
            autocommit=False        # autocommitを無効にする(デフォルトで無効になっているが念のため)
        )
    except mariadb.Error as e:
        print(f"DB接続エラー:{e}")
        return None
    
    return conn

# insertを実行
def execInsert(conn):
    sql = 'INSERT INTO StockJournal(itemCode, warehouseCode, quantity, journalDate) VALUES(?, ?, ?, CURRENT_TIME)'
    param1 = ('ITM0001', 'T1', 100)
    param2 = [
        ('ITM0001', 'T1', 100),
        ('ITM0002', 'T1', 50),
        ('ITM0003', 'C1', 200),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"insertの処理に失敗しました:{e}")
    finally:
        cur.close()

# updateを実行
def execUpdate(conn):
    sql = 'UPDATE Item SET itemName= ? WHERE code = ?'
    param1 = ('硬い石new', 'ITM0002')
    param2 = [
        ('柔らかい石new', 'ITM0001'),
        ('不思議な石new', 'ITM0003'),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"updateの処理に失敗しました:{e}")
    finally:
        cur.close()

# deleteを実行
def execDelete(conn):
    sql = 'DELETE FROM StockJournal WHERE itemCode = ?'
    param1 = ('ITM0001',)
    param2 = [
        ('ITM0002',),
        ('ITM0003',),
    ]

    cur = conn.cursor()
    try:
        # 1レコードのみinsert
        cur.execute(sql, param1)
        # 複数レコードinsert
        cur.executemany(sql, param2)
    except Exception as e:
        print(f"deleteの処理に失敗しました:{e}")
    finally:
        cur.close()


# 処理を実行する
def execSQL():
    # DBのコネクションを作成
    conn = createConnection()
    if conn == None:
        return
    
    try:
        execInsert(conn)
        execUpdate(conn)
        # commitを実行する
        conn.commit()
        
        execDelete(conn)
        # commitを実行する
        conn.commit()
    except mariadb.Error as e:
        print(f"DBエラー:{e}")
        # エラーが発生している場合rollbackを実行する
        conn.rollback()
    finally:
        conn.close()


# execCRUDを実行
execSQL()

カーソルを使用する

SELECTした結果をフェッチする処理の例です。
executeでクエリを実行した後、fetchmanyメソッドで指定した数のレコード(サンプルでは100件)を取得しています。


import mariadb

# DBのコネクションを返す
def createConnection():
    try:
        conn = mariadb.connect(
            user='TestUser1',       # MariaDBのユーザーID
            password='password',    # MariaDBのrootユーザーのパスワード
            host='192.168.11.50',   # MariaDBのサーバーアドレス
            port=3306,              # MariaDBのポート番号
            database='TestDB',      # デフォルトで使用するDB
        )
    except mariadb.Error as e:
        print(f"DB接続エラー:{e}")
        return None
    
    return conn

# selectを実行
def execSelectFetch(conn):
    sql = 'SELECT id, itemCode, warehouseCode, quantity, journalDate FROM StockJournal ORDER BY ID'

    cur = conn.cursor()
    try:
        # SQLを実行する
        cur.execute(sql)
        print("実行したSQL:", cur.statement)

        counter = 0
        # SELECT結果を取得する
        while True:
            # 引数で指定した数だけ、レコードを取得する
            recordList = cur.fetchmany(100)

            # 取得したレコードが0件の場合ループを抜ける
            if not recordList:
                break;
            
            # なんとなく1,000,000件に1回ぐらいはレコードの情報を表示してみる
            if counter % 10000 == 0:
                # 100件フェッチしている中から1番目のデータを取得して、表示
                record = recordList[0] 
                print("id:",record[0], "itemcode:", record[1], "warehouseCode:", record[2])

            counter += 1;
    finally:
        cur.close()

# 処理を実行する
def execSQL():
    # DBのコネクションを作成
    conn = createConnection()
    if conn == None:
        return
    
    try:
        execSelectFetch(conn)
    except mariadb.Error as e:
        print(f"DBエラー:{e}")
    finally:
        conn.close()


# execCRUDを実行
execSQL()

ページの一番上へ