1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| import pymysql import time import threading from contextlib import contextmanager
class TransactionManager: def __init__(self, db_config): self.db_config = db_config @contextmanager def get_connection(self): conn = pymysql.connect(**self.db_config) try: yield conn finally: conn.close() def optimistic_update(self, account_id, new_balance): """乐观锁更新""" max_retries = 3 for attempt in range(max_retries): try: with self.get_connection() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT balance, version FROM accounts WHERE id = %s", (account_id,) ) result = cursor.fetchone() if not result: return False, "账户不存在" current_balance, version = result cursor.execute(""" UPDATE accounts SET balance = %s, version = version + 1 WHERE id = %s AND version = %s """, (new_balance, account_id, version)) if cursor.rowcount == 1: conn.commit() return True, "更新成功" else: time.sleep(0.01 * (2 ** attempt)) continue except Exception as e: return False, f"更新失败: {str(e)}" return False, "重试次数超限" def pessimistic_transfer(self, from_id, to_id, amount): """悲观锁转账""" try: with self.get_connection() as conn: with conn.cursor() as cursor: conn.begin() ids = sorted([from_id, to_id]) for account_id in ids: cursor.execute( "SELECT balance FROM accounts WHERE id = %s FOR UPDATE", (account_id,) ) cursor.execute( "SELECT balance FROM accounts WHERE id = %s", (from_id,) ) from_balance = cursor.fetchone()[0] if from_balance < amount: conn.rollback() return False, "余额不足" cursor.execute( "UPDATE accounts SET balance = balance - %s WHERE id = %s", (amount, from_id) ) cursor.execute( "UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, to_id) ) conn.commit() return True, "转账成功" except Exception as e: return False, f"转账失败: {str(e)}"
if __name__ == "__main__": db_config = { 'host': 'localhost', 'user': 'your_user', 'password': 'your_password', 'database': 'your_database', 'charset': 'utf8mb4' } tm = TransactionManager(db_config) success, message = tm.optimistic_update(1, 1500.00) print(f"乐观锁更新: {success}, {message}") success, message = tm.pessimistic_transfer(1, 2, 100.00) print(f"悲观锁转账: {success}, {message}")
|