MySQL事务与锁机制深度解析:并发控制实战指南
Orion K Lv6

事务和锁机制是MySQL并发控制的核心,正确理解和使用事务锁可以确保数据一致性,避免并发问题。本文将深入探讨MySQL的事务特性、锁机制和并发控制策略。

事务基础理论

1. ACID 特性详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
-- 创建测试表
CREATE TABLE accounts (
id INT PRIMARY KEY AUTO_INCREMENT,
account_number VARCHAR(20) UNIQUE NOT NULL,
balance DECIMAL(10,2) NOT NULL DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE transactions (
id INT PRIMARY KEY AUTO_INCREMENT,
from_account_id INT,
to_account_id INT,
amount DECIMAL(10,2) NOT NULL,
transaction_type ENUM('transfer', 'deposit', 'withdrawal') NOT NULL,
status ENUM('pending', 'completed', 'failed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (from_account_id) REFERENCES accounts(id),
FOREIGN KEY (to_account_id) REFERENCES accounts(id)
);

-- 插入测试数据
INSERT INTO accounts (account_number, balance) VALUES
('ACC001', 1000.00),
('ACC002', 500.00),
('ACC003', 2000.00);

-- 1. 原子性 (Atomicity) 演示
START TRANSACTION;

-- 转账操作:从账户1转500到账户2
UPDATE accounts SET balance = balance - 500.00 WHERE id = 1;
UPDATE accounts SET balance = balance + 500.00 WHERE id = 2;

-- 记录交易
INSERT INTO transactions (from_account_id, to_account_id, amount, transaction_type, status)
VALUES (1, 2, 500.00, 'transfer', 'completed');

-- 检查余额是否足够
SELECT balance FROM accounts WHERE id = 1;

-- 如果余额不足,回滚整个事务
-- ROLLBACK;

-- 如果一切正常,提交事务
COMMIT;

-- 2. 一致性 (Consistency) 演示
-- 创建触发器确保数据一致性
DELIMITER //
CREATE TRIGGER check_balance_before_update
BEFORE UPDATE ON accounts
FOR EACH ROW
BEGIN
IF NEW.balance < 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '余额不能为负数';
END IF;
END //
DELIMITER ;

-- 3. 隔离性 (Isolation) 演示
-- 会话1
START TRANSACTION;
SELECT balance FROM accounts WHERE id = 1; -- 读取初始余额
-- 暂停,让会话2执行

-- 会话2
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;

-- 回到会话1
SELECT balance FROM accounts WHERE id = 1; -- 根据隔离级别,可能看到不同结果
COMMIT;

-- 4. 持久性 (Durability) 演示
-- 一旦事务提交,数据就永久保存
START TRANSACTION;
UPDATE accounts SET balance = 1500.00 WHERE id = 1;
COMMIT;

-- 即使数据库重启,数据也不会丢失

2. 事务隔离级别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
-- 查看当前隔离级别
SELECT @@transaction_isolation;
SHOW VARIABLES LIKE 'transaction_isolation';

-- 设置隔离级别
-- 1. READ UNCOMMITTED (读未提交)
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

-- 2. READ COMMITTED (读已提交)
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 3. REPEATABLE READ (可重复读) - MySQL默认
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 4. SERIALIZABLE (串行化)
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- 隔离级别测试
-- 创建测试场景
CREATE TABLE isolation_test (
id INT PRIMARY KEY,
value INT NOT NULL
);

INSERT INTO isolation_test VALUES (1, 100);

-- 脏读测试 (READ UNCOMMITTED)
-- 会话1
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
START TRANSACTION;
SELECT * FROM isolation_test WHERE id = 1; -- 读取初始值

-- 会话2
START TRANSACTION;
UPDATE isolation_test SET value = 200 WHERE id = 1; -- 未提交
-- 不要提交

-- 回到会话1
SELECT * FROM isolation_test WHERE id = 1; -- 可能读到未提交的值200 (脏读)
COMMIT;

-- 不可重复读测试 (READ COMMITTED)
-- 会话1
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
START TRANSACTION;
SELECT * FROM isolation_test WHERE id = 1; -- 第一次读取

-- 会话2
START TRANSACTION;
UPDATE isolation_test SET value = 300 WHERE id = 1;
COMMIT; -- 提交更改

-- 回到会话1
SELECT * FROM isolation_test WHERE id = 1; -- 第二次读取,值可能不同 (不可重复读)
COMMIT;

-- 幻读测试 (REPEATABLE READ)
-- 会话1
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;
START TRANSACTION;
SELECT * FROM isolation_test WHERE value > 50; -- 第一次查询

-- 会话2
START TRANSACTION;
INSERT INTO isolation_test VALUES (2, 150); -- 插入新行
COMMIT;

-- 回到会话1
SELECT * FROM isolation_test WHERE value > 50; -- 第二次查询,可能出现新行 (幻读)
COMMIT;

锁机制详解

1. 锁的类型和级别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-- 查看当前锁信息
SELECT * FROM performance_schema.data_locks;
SELECT * FROM performance_schema.data_lock_waits;

-- 表级锁
-- 1. 表共享读锁
LOCK TABLES accounts READ;
SELECT * FROM accounts; -- 可以读取
-- UPDATE accounts SET balance = 1000 WHERE id = 1; -- 会报错
UNLOCK TABLES;

-- 2. 表独占写锁
LOCK TABLES accounts WRITE;
SELECT * FROM accounts; -- 可以读取
UPDATE accounts SET balance = 1000 WHERE id = 1; -- 可以写入
UNLOCK TABLES;

-- 行级锁演示
-- 1. 共享锁 (S锁)
START TRANSACTION;
SELECT * FROM accounts WHERE id = 1 LOCK IN SHARE MODE;
-- 其他事务可以读取,但不能修改这一行
-- COMMIT;

-- 2. 排他锁 (X锁)
START TRANSACTION;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- 其他事务不能读取或修改这一行,直到当前事务结束
-- COMMIT;

-- 3. 意向锁演示
-- 意向锁是表级锁,用于表示事务准备在表的某些行上加锁
-- MySQL自动管理意向锁,无需手动操作

-- 查看锁等待情况
SELECT
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

2. 死锁检测和处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 创建死锁场景
-- 会话1
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 暂停,让会话2执行

-- 会话2
START TRANSACTION;
UPDATE accounts SET balance = balance - 50 WHERE id = 2;
-- 现在尝试更新会话1锁定的行
UPDATE accounts SET balance = balance + 50 WHERE id = 1; -- 等待

-- 回到会话1,尝试更新会话2锁定的行
UPDATE accounts SET balance = balance + 100 WHERE id = 2; -- 死锁!

-- MySQL会自动检测死锁并回滚其中一个事务

-- 查看死锁信息
SHOW ENGINE INNODB STATUS;

-- 死锁预防策略
-- 1. 按固定顺序访问资源
START TRANSACTION;
-- 总是先锁定ID较小的账户
SELECT * FROM accounts WHERE id IN (1, 2) ORDER BY id FOR UPDATE;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

-- 2. 减少事务持有锁的时间
START TRANSACTION;
-- 尽快完成操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT; -- 立即提交

-- 3. 使用较低的隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

3. 锁监控和诊断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
-- 查看当前锁信息 (MySQL 8.0+)
SELECT
ENGINE_LOCK_ID,
ENGINE_TRANSACTION_ID,
THREAD_ID,
OBJECT_SCHEMA,
OBJECT_NAME,
LOCK_TYPE,
LOCK_MODE,
LOCK_STATUS,
LOCK_DATA
FROM performance_schema.data_locks
ORDER BY ENGINE_TRANSACTION_ID;

-- 查看锁等待信息
SELECT
REQUESTING_ENGINE_LOCK_ID,
REQUESTING_ENGINE_TRANSACTION_ID,
REQUESTING_THREAD_ID,
BLOCKING_ENGINE_LOCK_ID,
BLOCKING_ENGINE_TRANSACTION_ID,
BLOCKING_THREAD_ID
FROM performance_schema.data_lock_waits;

-- 查看事务信息
SELECT
trx_id,
trx_state,
trx_started,
trx_requested_lock_id,
trx_wait_started,
trx_weight,
trx_mysql_thread_id,
trx_query
FROM information_schema.innodb_trx;

-- 创建锁监控视图
CREATE VIEW lock_monitor AS
SELECT
dl.ENGINE_TRANSACTION_ID as trx_id,
dl.THREAD_ID,
dl.OBJECT_SCHEMA as db_name,
dl.OBJECT_NAME as table_name,
dl.LOCK_TYPE,
dl.LOCK_MODE,
dl.LOCK_STATUS,
dl.LOCK_DATA,
it.trx_started,
it.trx_query
FROM performance_schema.data_locks dl
LEFT JOIN information_schema.innodb_trx it ON dl.ENGINE_TRANSACTION_ID = it.trx_id
ORDER BY dl.ENGINE_TRANSACTION_ID;

-- 使用监控视图
SELECT * FROM lock_monitor WHERE LOCK_STATUS = 'WAITING';

并发控制策略

1. 乐观锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
-- 添加版本号字段
ALTER TABLE accounts ADD COLUMN version INT DEFAULT 1;

-- 乐观锁更新示例
-- 1. 读取数据和版本号
SELECT id, balance, version FROM accounts WHERE id = 1;

-- 2. 业务逻辑处理
-- 假设读取到 balance = 1000, version = 1

-- 3. 更新时检查版本号
UPDATE accounts
SET balance = 900, version = version + 1
WHERE id = 1 AND version = 1;

-- 4. 检查影响行数
SELECT ROW_COUNT(); -- 如果返回0,说明版本冲突

-- 乐观锁的存储过程实现
DELIMITER //
CREATE PROCEDURE optimistic_transfer(
IN from_account_id INT,
IN to_account_id INT,
IN transfer_amount DECIMAL(10,2),
OUT result_code INT,
OUT result_message VARCHAR(255)
)
BEGIN
DECLARE from_balance DECIMAL(10,2);
DECLARE from_version INT;
DECLARE to_version INT;
DECLARE affected_rows INT;

DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
SET result_code = -1;
SET result_message = '转账失败:系统错误';
END;

START TRANSACTION;

-- 读取源账户信息
SELECT balance, version INTO from_balance, from_version
FROM accounts WHERE id = from_account_id;

-- 检查余额
IF from_balance < transfer_amount THEN
SET result_code = 1;
SET result_message = '余额不足';
ROLLBACK;
ELSE
-- 读取目标账户版本
SELECT version INTO to_version FROM accounts WHERE id = to_account_id;

-- 更新源账户
UPDATE accounts
SET balance = balance - transfer_amount, version = version + 1
WHERE id = from_account_id AND version = from_version;

SET affected_rows = ROW_COUNT();

IF affected_rows = 0 THEN
SET result_code = 2;
SET result_message = '源账户数据已被修改,请重试';
ROLLBACK;
ELSE
-- 更新目标账户
UPDATE accounts
SET balance = balance + transfer_amount, version = version + 1
WHERE id = to_account_id AND version = to_version;

SET affected_rows = ROW_COUNT();

IF affected_rows = 0 THEN
SET result_code = 3;
SET result_message = '目标账户数据已被修改,请重试';
ROLLBACK;
ELSE
SET result_code = 0;
SET result_message = '转账成功';
COMMIT;
END IF;
END IF;
END IF;
END //
DELIMITER ;

-- 使用乐观锁转账
CALL optimistic_transfer(1, 2, 100.00, @code, @msg);
SELECT @code, @msg;

2. 悲观锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
-- 悲观锁转账实现
DELIMITER //
CREATE PROCEDURE pessimistic_transfer(
IN from_account_id INT,
IN to_account_id INT,
IN transfer_amount DECIMAL(10,2),
OUT result_code INT,
OUT result_message VARCHAR(255)
)
BEGIN
DECLARE from_balance DECIMAL(10,2);

DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
SET result_code = -1;
SET result_message = '转账失败:系统错误';
END;

START TRANSACTION;

-- 按ID顺序锁定账户,避免死锁
IF from_account_id < to_account_id THEN
SELECT balance INTO from_balance FROM accounts
WHERE id = from_account_id FOR UPDATE;

SELECT id FROM accounts WHERE id = to_account_id FOR UPDATE;
ELSE
SELECT id FROM accounts WHERE id = to_account_id FOR UPDATE;

SELECT balance INTO from_balance FROM accounts
WHERE id = from_account_id FOR UPDATE;
END IF;

-- 检查余额
IF from_balance < transfer_amount THEN
SET result_code = 1;
SET result_message = '余额不足';
ROLLBACK;
ELSE
-- 执行转账
UPDATE accounts SET balance = balance - transfer_amount
WHERE id = from_account_id;

UPDATE accounts SET balance = balance + transfer_amount
WHERE id = to_account_id;

SET result_code = 0;
SET result_message = '转账成功';
COMMIT;
END IF;
END //
DELIMITER ;

-- 使用悲观锁转账
CALL pessimistic_transfer(1, 2, 100.00, @code, @msg);
SELECT @code, @msg;

3. 分布式锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
-- 使用MySQL实现分布式锁
CREATE TABLE distributed_locks (
lock_name VARCHAR(64) PRIMARY KEY,
lock_holder VARCHAR(64) NOT NULL,
lock_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expire_time TIMESTAMP NOT NULL,
INDEX idx_expire_time (expire_time)
);

-- 获取分布式锁的存储过程
DELIMITER //
CREATE PROCEDURE acquire_lock(
IN lock_name VARCHAR(64),
IN holder VARCHAR(64),
IN timeout_seconds INT,
OUT success BOOLEAN
)
BEGIN
DECLARE lock_count INT DEFAULT 0;
DECLARE current_time TIMESTAMP DEFAULT NOW();
DECLARE expire_time TIMESTAMP DEFAULT DATE_ADD(NOW(), INTERVAL timeout_seconds SECOND);

DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
SET success = FALSE;
END;

-- 清理过期锁
DELETE FROM distributed_locks WHERE expire_time < current_time;

-- 尝试获取锁
INSERT INTO distributed_locks (lock_name, lock_holder, expire_time)
VALUES (lock_name, holder, expire_time);

SET success = TRUE;
END //

CREATE PROCEDURE release_lock(
IN lock_name VARCHAR(64),
IN holder VARCHAR(64),
OUT success BOOLEAN
)
BEGIN
DECLARE affected_rows INT DEFAULT 0;

DELETE FROM distributed_locks
WHERE lock_name = lock_name AND lock_holder = holder;

SET affected_rows = ROW_COUNT();
SET success = (affected_rows > 0);
END //
DELIMITER ;

-- 使用分布式锁
CALL acquire_lock('transfer_lock_1_2', 'session_123', 30, @acquired);
SELECT @acquired;

-- 执行业务逻辑
-- ...

-- 释放锁
CALL release_lock('transfer_lock_1_2', 'session_123', @released);
SELECT @released;

性能优化技巧

1. 减少锁竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
-- 1. 缩短事务时间
-- 不好的做法
START TRANSACTION;
SELECT SLEEP(5); -- 模拟长时间操作
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;

-- 好的做法
-- 先完成计算,再开始事务
-- 计算逻辑...
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;

-- 2. 避免大事务
-- 不好的做法:一次性处理大量数据
START TRANSACTION;
UPDATE accounts SET balance = balance * 1.05; -- 更新所有账户
COMMIT;

-- 好的做法:分批处理
DELIMITER //
CREATE PROCEDURE batch_update_balance()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE account_id INT;
DECLARE batch_size INT DEFAULT 100;
DECLARE processed INT DEFAULT 0;

DECLARE account_cursor CURSOR FOR
SELECT id FROM accounts ORDER BY id;

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

OPEN account_cursor;

read_loop: LOOP
START TRANSACTION;

SET processed = 0;

batch_loop: LOOP
FETCH account_cursor INTO account_id;

IF done THEN
LEAVE batch_loop;
END IF;

UPDATE accounts SET balance = balance * 1.05 WHERE id = account_id;
SET processed = processed + 1;

IF processed >= batch_size THEN
LEAVE batch_loop;
END IF;
END LOOP;

COMMIT;

IF done THEN
LEAVE read_loop;
END IF;
END LOOP;

CLOSE account_cursor;
END //
DELIMITER ;

-- 3. 使用合适的隔离级别
-- 对于读多写少的场景,使用READ COMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 4. 避免热点行
-- 创建计数器表避免单行更新竞争
CREATE TABLE counters (
counter_name VARCHAR(50) PRIMARY KEY,
counter_value BIGINT DEFAULT 0,
shard_id INT DEFAULT 0,
INDEX idx_shard (counter_name, shard_id)
);

-- 分片计数器
INSERT INTO counters (counter_name, shard_id, counter_value) VALUES
('page_views', 0, 0),
('page_views', 1, 0),
('page_views', 2, 0),
('page_views', 3, 0);

-- 随机选择分片更新
UPDATE counters
SET counter_value = counter_value + 1
WHERE counter_name = 'page_views'
AND shard_id = FLOOR(RAND() * 4);

-- 查询总计数
SELECT SUM(counter_value) as total_views
FROM counters
WHERE counter_name = 'page_views';

2. 锁监控和调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
-- 创建锁监控表
CREATE TABLE lock_monitor_log (
id INT PRIMARY KEY AUTO_INCREMENT,
check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
waiting_transactions INT,
lock_waits INT,
deadlocks INT,
avg_wait_time DECIMAL(10,3)
);

-- 锁监控存储过程
DELIMITER //
CREATE PROCEDURE monitor_locks()
BEGIN
DECLARE waiting_trx INT DEFAULT 0;
DECLARE lock_wait_count INT DEFAULT 0;
DECLARE deadlock_count INT DEFAULT 0;

-- 统计等待中的事务
SELECT COUNT(*) INTO waiting_trx
FROM performance_schema.data_locks
WHERE LOCK_STATUS = 'WAITING';

-- 统计锁等待
SELECT COUNT(*) INTO lock_wait_count
FROM performance_schema.data_lock_waits;

-- 获取死锁统计(需要解析SHOW ENGINE INNODB STATUS输出)
-- 这里简化处理
SET deadlock_count = 0;

-- 记录监控数据
INSERT INTO lock_monitor_log (waiting_transactions, lock_waits, deadlocks)
VALUES (waiting_trx, lock_wait_count, deadlock_count);

-- 输出当前状态
SELECT
waiting_trx as '等待事务数',
lock_wait_count as '锁等待数',
deadlock_count as '死锁数';

-- 如果有严重的锁等待,输出详细信息
IF lock_wait_count > 5 THEN
SELECT
'WARNING: 检测到大量锁等待,请检查以下事务:' as message;

SELECT
ENGINE_TRANSACTION_ID,
THREAD_ID,
OBJECT_SCHEMA,
OBJECT_NAME,
LOCK_TYPE,
LOCK_MODE,
LOCK_DATA
FROM performance_schema.data_locks
WHERE LOCK_STATUS = 'WAITING';
END IF;
END //
DELIMITER ;

-- 定期执行监控
-- 可以通过事件调度器定期执行
-- SET GLOBAL event_scheduler = ON;
-- CREATE EVENT lock_monitor_event
-- ON SCHEDULE EVERY 1 MINUTE
-- DO CALL monitor_locks();

最佳实践总结

1. 事务设计原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 1. 保持事务简短
-- 好的做法
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
INSERT INTO transactions (from_account_id, to_account_id, amount, transaction_type)
VALUES (1, 2, 100, 'transfer');
COMMIT;

-- 2. 避免用户交互
-- 不要在事务中等待用户输入

-- 3. 合理使用隔离级别
-- 根据业务需求选择最低的隔离级别

-- 4. 处理异常情况
DELIMITER //
CREATE PROCEDURE safe_transfer(
IN from_id INT,
IN to_id INT,
IN amount DECIMAL(10,2)
)
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
RESIGNAL;
END;

START TRANSACTION;

-- 业务逻辑
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;

COMMIT;
END //
DELIMITER ;

2. 锁使用建议

  1. 尽量使用行级锁: 避免表级锁的使用
  2. 按顺序获取锁: 避免死锁的发生
  3. 及时释放锁: 缩短锁持有时间
  4. 选择合适的锁类型: 读操作使用共享锁,写操作使用排他锁
  5. 监控锁等待: 定期检查锁等待情况

3. 并发控制策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Python 应用层并发控制示例
import pymysql
import time
import threading
from contextlib import contextmanager

class TransactionManager:
def __init__(self, db_config):
self.db_config = db_config

@contextmanager
def get_connection(self):
conn = pymysql.connect(**self.db_config)
try:
yield conn
finally:
conn.close()

def optimistic_update(self, account_id, new_balance):
"""乐观锁更新"""
max_retries = 3
for attempt in range(max_retries):
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
# 读取当前版本
cursor.execute(
"SELECT balance, version FROM accounts WHERE id = %s",
(account_id,)
)
result = cursor.fetchone()
if not result:
return False, "账户不存在"

current_balance, version = result

# 尝试更新
cursor.execute("""
UPDATE accounts
SET balance = %s, version = version + 1
WHERE id = %s AND version = %s
""", (new_balance, account_id, version))

if cursor.rowcount == 1:
conn.commit()
return True, "更新成功"
else:
# 版本冲突,重试
time.sleep(0.01 * (2 ** attempt)) # 指数退避
continue

except Exception as e:
return False, f"更新失败: {str(e)}"

return False, "重试次数超限"

def pessimistic_transfer(self, from_id, to_id, amount):
"""悲观锁转账"""
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
conn.begin()

# 按ID顺序锁定,避免死锁
ids = sorted([from_id, to_id])
for account_id in ids:
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(account_id,)
)

# 检查余额
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s",
(from_id,)
)
from_balance = cursor.fetchone()[0]

if from_balance < amount:
conn.rollback()
return False, "余额不足"

# 执行转账
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id)
)
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id)
)

conn.commit()
return True, "转账成功"

except Exception as e:
return False, f"转账失败: {str(e)}"

# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'your_user',
'password': 'your_password',
'database': 'your_database',
'charset': 'utf8mb4'
}

tm = TransactionManager(db_config)

# 测试乐观锁
success, message = tm.optimistic_update(1, 1500.00)
print(f"乐观锁更新: {success}, {message}")

# 测试悲观锁转账
success, message = tm.pessimistic_transfer(1, 2, 100.00)
print(f"悲观锁转账: {success}, {message}")

通过深入理解MySQL的事务和锁机制,合理设计并发控制策略,可以确保数据库在高并发环境下的数据一致性和系统稳定性。记住,选择合适的并发控制策略需要根据具体的业务场景和性能要求来决定。

本站由 提供部署服务