MySQL数据库安全防护实战指南
Orion K Lv6

MySQL数据库安全防护实战指南

数据库安全是企业信息安全的重要组成部分。本文将从多个维度详细介绍MySQL数据库的安全防护策略和最佳实践,帮助构建安全可靠的数据库环境。

用户权限管理

1. 用户账户安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
-- 查看当前用户和权限
SELECT USER(), CURRENT_USER();
SHOW GRANTS FOR CURRENT_USER();

-- 查看所有用户账户
SELECT
User as '用户名',
Host as '主机',
authentication_string as '密码哈希',
password_expired as '密码过期',
password_last_changed as '密码最后修改',
password_lifetime as '密码有效期',
account_locked as '账户锁定'
FROM mysql.user;

-- 创建安全的用户账户
-- 1. 应用程序只读用户
CREATE USER 'app_readonly'@'192.168.1.%' IDENTIFIED BY 'StrongP@ssw0rd123!';
GRANT SELECT ON myapp.* TO 'app_readonly'@'192.168.1.%';

-- 2. 应用程序读写用户
CREATE USER 'app_readwrite'@'192.168.1.%' IDENTIFIED BY 'StrongP@ssw0rd456!';
GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.* TO 'app_readwrite'@'192.168.1.%';

-- 3. 数据库管理员用户
CREATE USER 'db_admin'@'localhost' IDENTIFIED BY 'AdminP@ssw0rd789!';
GRANT ALL PRIVILEGES ON *.* TO 'db_admin'@'localhost' WITH GRANT OPTION;

-- 4. 备份专用用户
CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'BackupP@ssw0rd!';
GRANT SELECT, LOCK TABLES, SHOW VIEW, EVENT, TRIGGER ON *.* TO 'backup_user'@'localhost';

-- 设置密码策略
-- 查看密码验证插件状态
SHOW VARIABLES LIKE 'validate_password%';

-- 安装密码验证插件(如果未安装)
-- INSTALL PLUGIN validate_password SONAME 'validate_password.so';

-- 配置密码策略
SET GLOBAL validate_password.policy = 'STRONG';
SET GLOBAL validate_password.length = 12;
SET GLOBAL validate_password.mixed_case_count = 1;
SET GLOBAL validate_password.number_count = 1;
SET GLOBAL validate_password.special_char_count = 1;

-- 设置密码过期策略
ALTER USER 'app_readonly'@'192.168.1.%' PASSWORD EXPIRE INTERVAL 90 DAY;
ALTER USER 'app_readwrite'@'192.168.1.%' PASSWORD EXPIRE INTERVAL 90 DAY;

-- 账户锁定策略
CREATE USER 'test_user'@'localhost' IDENTIFIED BY 'TestP@ssw0rd!'
FAILED_LOGIN_ATTEMPTS 3
PASSWORD_LOCK_TIME 2;

2. 权限最小化原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
-- 创建细粒度权限的用户
-- 1. 报表查询用户(只能查询特定表)
CREATE USER 'report_user'@'%' IDENTIFIED BY 'ReportP@ssw0rd!';
GRANT SELECT ON myapp.orders TO 'report_user'@'%';
GRANT SELECT ON myapp.users TO 'report_user'@'%';
GRANT SELECT ON myapp.products TO 'report_user'@'%';

-- 2. 数据分析用户(只能查询,不能修改)
CREATE USER 'analyst'@'%' IDENTIFIED BY 'AnalystP@ssw0rd!';
GRANT SELECT ON myapp.* TO 'analyst'@'%';
-- 排除敏感表
REVOKE SELECT ON myapp.user_passwords FROM 'analyst'@'%';
REVOKE SELECT ON myapp.payment_info FROM 'analyst'@'%';

-- 3. 应用服务用户(特定操作权限)
CREATE USER 'order_service'@'192.168.1.%' IDENTIFIED BY 'OrderServiceP@ssw0rd!';
GRANT SELECT, INSERT, UPDATE ON myapp.orders TO 'order_service'@'192.168.1.%';
GRANT SELECT, UPDATE ON myapp.products TO 'order_service'@'192.168.1.%';
GRANT SELECT ON myapp.users TO 'order_service'@'192.168.1.%';

-- 4. 监控用户(只能查看状态信息)
CREATE USER 'monitor'@'localhost' IDENTIFIED BY 'MonitorP@ssw0rd!';
GRANT PROCESS, REPLICATION CLIENT ON *.* TO 'monitor'@'localhost';
GRANT SELECT ON performance_schema.* TO 'monitor'@'localhost';
GRANT SELECT ON information_schema.* TO 'monitor'@'localhost';

-- 权限审计查询
SELECT
GRANTEE as '用户',
TABLE_SCHEMA as '数据库',
TABLE_NAME as '表名',
PRIVILEGE_TYPE as '权限类型',
IS_GRANTABLE as '可授权'
FROM information_schema.TABLE_PRIVILEGES
WHERE GRANTEE LIKE '%app_%'
ORDER BY GRANTEE, TABLE_SCHEMA, TABLE_NAME;

-- 查看用户的所有权限
DELIMITER //
CREATE PROCEDURE ShowUserPrivileges(IN username VARCHAR(100))
BEGIN
DECLARE user_host VARCHAR(200);
SET user_host = CONCAT("'", username, "'");

-- 全局权限
SELECT 'GLOBAL PRIVILEGES' as privilege_level,
PRIVILEGE_TYPE as privilege_type,
'N/A' as object_schema,
'N/A' as object_name
FROM information_schema.USER_PRIVILEGES
WHERE GRANTEE = user_host

UNION ALL

-- 数据库级权限
SELECT 'SCHEMA PRIVILEGES' as privilege_level,
PRIVILEGE_TYPE as privilege_type,
SCHEMA_NAME as object_schema,
'N/A' as object_name
FROM information_schema.SCHEMA_PRIVILEGES
WHERE GRANTEE = user_host

UNION ALL

-- 表级权限
SELECT 'TABLE PRIVILEGES' as privilege_level,
PRIVILEGE_TYPE as privilege_type,
TABLE_SCHEMA as object_schema,
TABLE_NAME as object_name
FROM information_schema.TABLE_PRIVILEGES
WHERE GRANTEE = user_host

ORDER BY privilege_level, object_schema, object_name;
END //
DELIMITER ;

-- 使用权限查询存储过程
CALL ShowUserPrivileges('app_readonly@192.168.1.%');

3. 角色管理(MySQL 8.0+)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
-- 创建角色
CREATE ROLE 'app_developer', 'app_tester', 'data_analyst', 'db_monitor';

-- 为角色分配权限
-- 开发者角色
GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.* TO 'app_developer';
GRANT CREATE, DROP, ALTER ON myapp.* TO 'app_developer';

-- 测试者角色
GRANT SELECT, INSERT, UPDATE, DELETE ON test_db.* TO 'app_tester';

-- 数据分析师角色
GRANT SELECT ON myapp.orders TO 'data_analyst';
GRANT SELECT ON myapp.users TO 'data_analyst';
GRANT SELECT ON myapp.products TO 'data_analyst';

-- 监控角色
GRANT PROCESS, REPLICATION CLIENT ON *.* TO 'db_monitor';
GRANT SELECT ON performance_schema.* TO 'db_monitor';

-- 创建用户并分配角色
CREATE USER 'john_dev'@'%' IDENTIFIED BY 'DevP@ssw0rd!';
CREATE USER 'jane_analyst'@'%' IDENTIFIED BY 'AnalystP@ssw0rd!';

GRANT 'app_developer' TO 'john_dev'@'%';
GRANT 'data_analyst' TO 'jane_analyst'@'%';

-- 设置默认角色
SET DEFAULT ROLE 'app_developer' TO 'john_dev'@'%';
SET DEFAULT ROLE 'data_analyst' TO 'jane_analyst'@'%';

-- 查看角色信息
SELECT * FROM information_schema.APPLICABLE_ROLES;
SELECT * FROM information_schema.ENABLED_ROLES;

-- 角色权限查询
SELECT
GRANTEE as '角色/用户',
PRIVILEGE_TYPE as '权限',
TABLE_SCHEMA as '数据库',
TABLE_NAME as '表名'
FROM information_schema.TABLE_PRIVILEGES
WHERE GRANTEE IN ("'app_developer'", "'data_analyst'", "'db_monitor'")
ORDER BY GRANTEE, TABLE_SCHEMA;

数据加密和保护

1. 传输加密(SSL/TLS)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-- 查看SSL状态
SHOW VARIABLES LIKE '%ssl%';
SHOW STATUS LIKE 'Ssl%';

-- 查看SSL证书信息
SELECT
VARIABLE_NAME as '配置项',
VARIABLE_VALUE as '值'
FROM performance_schema.global_variables
WHERE VARIABLE_NAME IN (
'ssl_ca', 'ssl_cert', 'ssl_key',
'ssl_cipher', 'ssl_crl', 'ssl_crlpath'
);

-- 创建要求SSL连接的用户
CREATE USER 'secure_user'@'%' IDENTIFIED BY 'SecureP@ssw0rd!' REQUIRE SSL;

-- 创建要求特定SSL证书的用户
CREATE USER 'cert_user'@'%' IDENTIFIED BY 'CertP@ssw0rd!'
REQUIRE X509;

-- 创建要求特定SSL配置的用户
CREATE USER 'strict_ssl_user'@'%' IDENTIFIED BY 'StrictP@ssw0rd!'
REQUIRE SSL
AND CIPHER 'AES256-SHA'
AND ISSUER '/C=US/ST=CA/L=San Francisco/O=MyCompany/CN=MyCA';

-- 检查当前连接的SSL状态
SELECT
CONNECTION_ID() as connection_id,
USER() as user,
@@ssl_cipher as ssl_cipher,
@@ssl_version as ssl_version;

-- SSL连接统计
SELECT
VARIABLE_NAME as '指标',
VARIABLE_VALUE as '值'
FROM performance_schema.global_status
WHERE VARIABLE_NAME IN (
'Ssl_accepts', 'Ssl_finished_accepts', 'Ssl_finished_connects',
'Ssl_client_connects', 'Ssl_connect_renegotiates'
);

2. 数据静态加密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
-- 创建加密表空间(MySQL 8.0+)
CREATE TABLESPACE encrypted_space
ADD DATAFILE 'encrypted_space.ibd'
ENCRYPTION = 'Y';

-- 创建加密表
CREATE TABLE sensitive_data (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
credit_card_number VARCHAR(255),
ssn VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) TABLESPACE encrypted_space ENCRYPTION = 'Y';

-- 为现有表启用加密
ALTER TABLE user_payments ENCRYPTION = 'Y';

-- 查看表的加密状态
SELECT
TABLE_SCHEMA as '数据库',
TABLE_NAME as '表名',
CREATE_OPTIONS as '创建选项'
FROM information_schema.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';

-- 字段级加密示例
CREATE TABLE user_sensitive_info (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
-- 使用AES加密存储敏感信息
encrypted_phone VARBINARY(255),
encrypted_address VARBINARY(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入加密数据
INSERT INTO user_sensitive_info (username, email, encrypted_phone, encrypted_address)
VALUES (
'john_doe',
'john@example.com',
AES_ENCRYPT('13800138000', 'encryption_key_phone'),
AES_ENCRYPT('123 Main St, City, State', 'encryption_key_address')
);

-- 查询解密数据
SELECT
id,
username,
email,
AES_DECRYPT(encrypted_phone, 'encryption_key_phone') as phone,
AES_DECRYPT(encrypted_address, 'encryption_key_address') as address
FROM user_sensitive_info;

-- 创建加密函数
DELIMITER //
CREATE FUNCTION EncryptSensitiveData(data TEXT, key_name VARCHAR(50))
RETURNS VARBINARY(255)
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE encryption_key VARCHAR(255);

-- 根据key_name获取对应的加密密钥
CASE key_name
WHEN 'phone' THEN SET encryption_key = 'phone_encryption_key_2023';
WHEN 'address' THEN SET encryption_key = 'address_encryption_key_2023';
WHEN 'credit_card' THEN SET encryption_key = 'cc_encryption_key_2023';
ELSE SET encryption_key = 'default_encryption_key_2023';
END CASE;

RETURN AES_ENCRYPT(data, encryption_key);
END //

CREATE FUNCTION DecryptSensitiveData(encrypted_data VARBINARY(255), key_name VARCHAR(50))
RETURNS TEXT
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE encryption_key VARCHAR(255);

-- 根据key_name获取对应的加密密钥
CASE key_name
WHEN 'phone' THEN SET encryption_key = 'phone_encryption_key_2023';
WHEN 'address' THEN SET encryption_key = 'address_encryption_key_2023';
WHEN 'credit_card' THEN SET encryption_key = 'cc_encryption_key_2023';
ELSE SET encryption_key = 'default_encryption_key_2023';
END CASE;

RETURN AES_DECRYPT(encrypted_data, encryption_key);
END //
DELIMITER ;

-- 使用加密函数
INSERT INTO user_sensitive_info (username, email, encrypted_phone, encrypted_address)
VALUES (
'jane_doe',
'jane@example.com',
EncryptSensitiveData('13900139000', 'phone'),
EncryptSensitiveData('456 Oak Ave, Town, State', 'address')
);

-- 查询时解密
SELECT
id,
username,
email,
DecryptSensitiveData(encrypted_phone, 'phone') as phone,
DecryptSensitiveData(encrypted_address, 'address') as address
FROM user_sensitive_info;

3. 数据脱敏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
-- 创建数据脱敏函数
DELIMITER //
CREATE FUNCTION MaskCreditCard(card_number VARCHAR(20))
RETURNS VARCHAR(20)
READS SQL DATA
DETERMINISTIC
BEGIN
IF card_number IS NULL OR LENGTH(card_number) < 8 THEN
RETURN card_number;
END IF;

RETURN CONCAT(
LEFT(card_number, 4),
REPEAT('*', LENGTH(card_number) - 8),
RIGHT(card_number, 4)
);
END //

CREATE FUNCTION MaskPhone(phone VARCHAR(20))
RETURNS VARCHAR(20)
READS SQL DATA
DETERMINISTIC
BEGIN
IF phone IS NULL OR LENGTH(phone) < 7 THEN
RETURN phone;
END IF;

-- 中国手机号脱敏
IF LENGTH(phone) = 11 AND phone REGEXP '^1[3-9][0-9]{9}$' THEN
RETURN CONCAT(LEFT(phone, 3), '****', RIGHT(phone, 4));
ELSE
RETURN CONCAT(LEFT(phone, 3), REPEAT('*', LENGTH(phone) - 5), RIGHT(phone, 2));
END IF;
END //

CREATE FUNCTION MaskEmail(email VARCHAR(100))
RETURNS VARCHAR(100)
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE at_pos INT;
DECLARE username_part VARCHAR(50);
DECLARE domain_part VARCHAR(50);

IF email IS NULL OR email NOT LIKE '%@%' THEN
RETURN email;
END IF;

SET at_pos = LOCATE('@', email);
SET username_part = LEFT(email, at_pos - 1);
SET domain_part = SUBSTRING(email, at_pos);

-- 用户名部分脱敏
IF LENGTH(username_part) <= 2 THEN
SET username_part = CONCAT(LEFT(username_part, 1), '*');
ELSE
SET username_part = CONCAT(
LEFT(username_part, 1),
REPEAT('*', LENGTH(username_part) - 2),
RIGHT(username_part, 1)
);
END IF;

RETURN CONCAT(username_part, domain_part);
END //
DELIMITER ;

-- 创建脱敏视图
CREATE VIEW users_masked AS
SELECT
id,
username,
MaskEmail(email) as email,
MaskPhone(phone) as phone,
region,
created_at
FROM users;

-- 创建敏感数据访问日志
CREATE TABLE sensitive_data_access_log (
id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(100) NOT NULL,
table_name VARCHAR(64) NOT NULL,
operation ENUM('SELECT', 'INSERT', 'UPDATE', 'DELETE') NOT NULL,
sensitive_fields JSON,
access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address VARCHAR(45),
INDEX idx_user_time (user_name, access_time),
INDEX idx_table_time (table_name, access_time)
);

-- 创建敏感数据访问触发器
DELIMITER //
CREATE TRIGGER tr_log_sensitive_access
AFTER SELECT ON users
FOR EACH ROW
BEGIN
INSERT INTO sensitive_data_access_log (
user_name, table_name, operation, sensitive_fields, ip_address
) VALUES (
USER(), 'users', 'SELECT',
JSON_ARRAY('email', 'phone'),
COALESCE(@client_ip, 'unknown')
);
END //
DELIMITER ;

审计和监控

1. 审计日志配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
-- 查看审计插件状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS
FROM information_schema.PLUGINS
WHERE PLUGIN_NAME LIKE '%audit%';

-- 审计配置变量
SHOW VARIABLES LIKE '%audit%';

-- 创建审计日志表
CREATE TABLE mysql_audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
connection_id INT,
username VARCHAR(100),
host VARCHAR(100),
command_type VARCHAR(50),
sql_text TEXT,
affected_rows INT,
execution_time DECIMAL(10,6),
error_code INT,
error_message TEXT,
INDEX idx_timestamp (timestamp),
INDEX idx_username (username),
INDEX idx_command_type (command_type)
);

-- 创建审计触发器(简化版本)
DELIMITER //
CREATE PROCEDURE LogDatabaseActivity(
IN p_username VARCHAR(100),
IN p_command_type VARCHAR(50),
IN p_sql_text TEXT,
IN p_affected_rows INT,
IN p_execution_time DECIMAL(10,6)
)
BEGIN
INSERT INTO mysql_audit_log (
connection_id, username, host, command_type,
sql_text, affected_rows, execution_time
) VALUES (
CONNECTION_ID(), p_username,
SUBSTRING_INDEX(USER(), '@', -1),
p_command_type, p_sql_text, p_affected_rows, p_execution_time
);
END //
DELIMITER ;

-- 查询审计日志
SELECT
timestamp as '时间',
username as '用户',
host as '主机',
command_type as '操作类型',
LEFT(sql_text, 100) as 'SQL语句',
affected_rows as '影响行数',
execution_time as '执行时间'
FROM mysql_audit_log
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL 1 DAY)
ORDER BY timestamp DESC
LIMIT 20;

-- 敏感操作审计
SELECT
DATE(timestamp) as '日期',
username as '用户',
COUNT(*) as '操作次数',
GROUP_CONCAT(DISTINCT command_type) as '操作类型'
FROM mysql_audit_log
WHERE command_type IN ('DELETE', 'DROP', 'ALTER', 'GRANT', 'REVOKE')
AND timestamp >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY DATE(timestamp), username
ORDER BY DATE(timestamp) DESC, COUNT(*) DESC;

2. 安全监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
-- 创建安全事件监控表
CREATE TABLE security_events (
id INT AUTO_INCREMENT PRIMARY KEY,
event_type ENUM('LOGIN_FAILURE', 'PRIVILEGE_ESCALATION', 'SUSPICIOUS_QUERY', 'DATA_EXPORT', 'AFTER_HOURS_ACCESS') NOT NULL,
username VARCHAR(100),
host VARCHAR(100),
event_details JSON,
severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL,
event_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved TINYINT DEFAULT 0,
INDEX idx_event_type (event_type),
INDEX idx_severity (severity),
INDEX idx_event_time (event_time),
INDEX idx_resolved (resolved)
);

-- 登录失败监控
DELIMITER //
CREATE PROCEDURE MonitorLoginFailures()
BEGIN
DECLARE failed_attempts INT DEFAULT 0;
DECLARE suspicious_user VARCHAR(100);
DECLARE suspicious_host VARCHAR(100);

-- 检查最近1小时内的登录失败
SELECT COUNT(*), username, host
INTO failed_attempts, suspicious_user, suspicious_host
FROM mysql_audit_log
WHERE command_type = 'Connect'
AND error_code != 0
AND timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY username, host
ORDER BY COUNT(*) DESC
LIMIT 1;

-- 如果失败次数超过阈值,记录安全事件
IF failed_attempts >= 5 THEN
INSERT INTO security_events (
event_type, username, host, event_details, severity
) VALUES (
'LOGIN_FAILURE', suspicious_user, suspicious_host,
JSON_OBJECT('failed_attempts', failed_attempts, 'time_window', '1 hour'),
CASE
WHEN failed_attempts >= 20 THEN 'CRITICAL'
WHEN failed_attempts >= 10 THEN 'HIGH'
ELSE 'MEDIUM'
END
);
END IF;
END //
DELIMITER ;

-- 可疑查询监控
DELIMITER //
CREATE PROCEDURE MonitorSuspiciousQueries()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_username VARCHAR(100);
DECLARE v_sql_text TEXT;
DECLARE v_timestamp TIMESTAMP;

DECLARE suspicious_cursor CURSOR FOR
SELECT username, sql_text, timestamp
FROM mysql_audit_log
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
AND (
sql_text LIKE '%SELECT%*%FROM%' OR -- 可能的SQL注入
sql_text LIKE '%UNION%SELECT%' OR
sql_text LIKE '%DROP%TABLE%' OR
sql_text LIKE '%DELETE%FROM%users%' OR
sql_text REGEXP '.*;.*SELECT.*FROM.*information_schema' -- 信息收集
);

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

OPEN suspicious_cursor;

monitor_loop: LOOP
FETCH suspicious_cursor INTO v_username, v_sql_text, v_timestamp;

IF done THEN LEAVE monitor_loop; END IF;

INSERT INTO security_events (
event_type, username, event_details, severity
) VALUES (
'SUSPICIOUS_QUERY', v_username,
JSON_OBJECT('sql_text', LEFT(v_sql_text, 500), 'timestamp', v_timestamp),
'HIGH'
);

END LOOP;

CLOSE suspicious_cursor;
END //
DELIMITER ;

-- 数据导出监控
DELIMITER //
CREATE PROCEDURE MonitorDataExport()
BEGIN
DECLARE large_export_count INT DEFAULT 0;

-- 检查大量数据导出
SELECT COUNT(*)
INTO large_export_count
FROM mysql_audit_log
WHERE command_type = 'SELECT'
AND affected_rows > 10000
AND timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR);

IF large_export_count > 0 THEN
INSERT INTO security_events (
event_type, event_details, severity
) VALUES (
'DATA_EXPORT',
JSON_OBJECT('large_exports', large_export_count, 'threshold', 10000),
'MEDIUM'
);
END IF;
END //
DELIMITER ;

-- 非工作时间访问监控
DELIMITER //
CREATE PROCEDURE MonitorAfterHoursAccess()
BEGIN
DECLARE after_hours_count INT DEFAULT 0;
DECLARE current_hour INT DEFAULT HOUR(NOW());

-- 检查非工作时间(晚上10点到早上6点)的访问
IF current_hour >= 22 OR current_hour <= 6 THEN
SELECT COUNT(*)
INTO after_hours_count
FROM mysql_audit_log
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
AND command_type IN ('SELECT', 'INSERT', 'UPDATE', 'DELETE');

IF after_hours_count > 10 THEN
INSERT INTO security_events (
event_type, event_details, severity
) VALUES (
'AFTER_HOURS_ACCESS',
JSON_OBJECT('activity_count', after_hours_count, 'hour', current_hour),
'MEDIUM'
);
END IF;
END IF;
END //
DELIMITER ;

-- 综合安全监控存储过程
DELIMITER //
CREATE PROCEDURE RunSecurityMonitoring()
BEGIN
CALL MonitorLoginFailures();
CALL MonitorSuspiciousQueries();
CALL MonitorDataExport();
CALL MonitorAfterHoursAccess();

-- 生成安全报告
SELECT
event_type as '事件类型',
COUNT(*) as '事件数量',
MAX(severity) as '最高严重级别',
MAX(event_time) as '最新事件时间'
FROM security_events
WHERE event_time >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
AND resolved = 0
GROUP BY event_type
ORDER BY
CASE MAX(severity)
WHEN 'CRITICAL' THEN 1
WHEN 'HIGH' THEN 2
WHEN 'MEDIUM' THEN 3
WHEN 'LOW' THEN 4
END;
END //
DELIMITER ;

-- 执行安全监控
CALL RunSecurityMonitoring();

3. 性能和安全指标监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
-- 创建安全指标监控视图
CREATE VIEW security_metrics AS
SELECT
'Active Connections' as metric_name,
COUNT(*) as metric_value,
'connections' as unit,
NOW() as measured_at
FROM information_schema.PROCESSLIST
WHERE COMMAND != 'Sleep'

UNION ALL

SELECT
'Failed Login Attempts (Last Hour)' as metric_name,
COUNT(*) as metric_value,
'attempts' as unit,
NOW() as measured_at
FROM mysql_audit_log
WHERE command_type = 'Connect'
AND error_code != 0
AND timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR)

UNION ALL

SELECT
'Privileged Operations (Last Hour)' as metric_name,
COUNT(*) as metric_value,
'operations' as unit,
NOW() as measured_at
FROM mysql_audit_log
WHERE command_type IN ('GRANT', 'REVOKE', 'CREATE USER', 'DROP USER')
AND timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR)

UNION ALL

SELECT
'Large Data Exports (Last Hour)' as metric_name,
COUNT(*) as metric_value,
'queries' as unit,
NOW() as measured_at
FROM mysql_audit_log
WHERE command_type = 'SELECT'
AND affected_rows > 1000
AND timestamp >= DATE_SUB(NOW(), INTERVAL 1 HOUR);

-- 查看安全指标
SELECT * FROM security_metrics;

-- 创建安全告警规则
CREATE TABLE security_alert_rules (
id INT AUTO_INCREMENT PRIMARY KEY,
rule_name VARCHAR(100) NOT NULL,
metric_name VARCHAR(100) NOT NULL,
threshold_value INT NOT NULL,
comparison_operator ENUM('>', '<', '>=', '<=', '=') NOT NULL,
severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL,
is_active TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入告警规则
INSERT INTO security_alert_rules (rule_name, metric_name, threshold_value, comparison_operator, severity) VALUES
('High Connection Count', 'Active Connections', 100, '>', 'MEDIUM'),
('Excessive Login Failures', 'Failed Login Attempts (Last Hour)', 10, '>', 'HIGH'),
('Suspicious Privilege Operations', 'Privileged Operations (Last Hour)', 5, '>', 'HIGH'),
('Large Data Export Alert', 'Large Data Exports (Last Hour)', 20, '>', 'MEDIUM');

-- 告警检查存储过程
DELIMITER //
CREATE PROCEDURE CheckSecurityAlerts()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_rule_name VARCHAR(100);
DECLARE v_metric_name VARCHAR(100);
DECLARE v_threshold INT;
DECLARE v_operator VARCHAR(5);
DECLARE v_severity VARCHAR(10);
DECLARE v_current_value INT;

DECLARE alert_cursor CURSOR FOR
SELECT rule_name, metric_name, threshold_value, comparison_operator, severity
FROM security_alert_rules
WHERE is_active = 1;

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

OPEN alert_cursor;

alert_loop: LOOP
FETCH alert_cursor INTO v_rule_name, v_metric_name, v_threshold, v_operator, v_severity;

IF done THEN LEAVE alert_loop; END IF;

-- 获取当前指标值
SELECT metric_value INTO v_current_value
FROM security_metrics
WHERE metric_name = v_metric_name;

-- 检查是否触发告警
SET @check_sql = CONCAT('SELECT ', v_current_value, ' ', v_operator, ' ', v_threshold);
PREPARE stmt FROM @check_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;

-- 如果触发告警,记录事件
IF @check_sql THEN
INSERT INTO security_events (
event_type, event_details, severity
) VALUES (
'THRESHOLD_EXCEEDED',
JSON_OBJECT(
'rule_name', v_rule_name,
'metric_name', v_metric_name,
'current_value', v_current_value,
'threshold', v_threshold
),
v_severity
);
END IF;

END LOOP;

CLOSE alert_cursor;
END //
DELIMITER ;

网络安全配置

1. 防火墙和网络访问控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
-- 查看当前连接信息
SELECT
ID as '连接ID',
USER as '用户',
HOST as '主机',
DB as '数据库',
COMMAND as '命令',
TIME as '时间',
STATE as '状态',
INFO as '查询信息'
FROM information_schema.PROCESSLIST
ORDER BY TIME DESC;

-- 创建IP白名单表
CREATE TABLE ip_whitelist (
id INT AUTO_INCREMENT PRIMARY KEY,
ip_address VARCHAR(45) NOT NULL,
ip_range VARCHAR(50),
description VARCHAR(255),
is_active TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_ip_address (ip_address)
);

-- 插入允许的IP地址
INSERT INTO ip_whitelist (ip_address, ip_range, description) VALUES
('192.168.1.100', '192.168.1.0/24', '办公网络'),
('10.0.0.50', '10.0.0.0/16', '内网服务器'),
('172.16.1.10', '172.16.1.0/24', '开发环境'),
('127.0.0.1', '127.0.0.1/32', '本地连接');

-- IP访问控制检查函数
DELIMITER //
CREATE FUNCTION IsIPAllowed(client_ip VARCHAR(45))
RETURNS TINYINT
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE ip_count INT DEFAULT 0;

-- 检查精确IP匹配
SELECT COUNT(*) INTO ip_count
FROM ip_whitelist
WHERE ip_address = client_ip AND is_active = 1;

IF ip_count > 0 THEN
RETURN 1;
END IF;

-- 检查IP范围匹配(简化版本)
SELECT COUNT(*) INTO ip_count
FROM ip_whitelist
WHERE is_active = 1
AND (
(ip_range LIKE '192.168.%' AND client_ip LIKE '192.168.%') OR
(ip_range LIKE '10.%' AND client_ip LIKE '10.%') OR
(ip_range LIKE '172.16.%' AND client_ip LIKE '172.16.%')
);

RETURN CASE WHEN ip_count > 0 THEN 1 ELSE 0 END;
END //
DELIMITER ;

-- 连接监控和限制
CREATE TABLE connection_limits (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(100) NOT NULL,
max_connections INT NOT NULL DEFAULT 10,
max_connections_per_hour INT NOT NULL DEFAULT 100,
max_queries_per_hour INT NOT NULL DEFAULT 1000,
is_active TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_username (username)
);

-- 设置用户连接限制
INSERT INTO connection_limits (username, max_connections, max_connections_per_hour, max_queries_per_hour) VALUES
('app_readonly', 5, 50, 500),
('app_readwrite', 10, 100, 1000),
('report_user', 3, 20, 200),
('analyst', 2, 10, 100);

-- 应用连接限制
ALTER USER 'app_readonly'@'%' WITH MAX_CONNECTIONS_PER_HOUR 50 MAX_QUERIES_PER_HOUR 500;
ALTER USER 'app_readwrite'@'%' WITH MAX_CONNECTIONS_PER_HOUR 100 MAX_QUERIES_PER_HOUR 1000;

2. 数据库配置安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- 安全配置检查
SELECT
'配置项' as config_item,
'当前值' as current_value,
'建议值' as recommended_value,
'安全级别' as security_level
UNION ALL
SELECT 'local_infile', @@local_infile, '0', 'HIGH'
UNION ALL
SELECT 'secure_file_priv', @@secure_file_priv, '指定目录', 'HIGH'
UNION ALL
SELECT 'sql_mode', @@sql_mode, '包含STRICT_TRANS_TABLES', 'MEDIUM'
UNION ALL
SELECT 'log_bin', @@log_bin, '1', 'MEDIUM'
UNION ALL
SELECT 'general_log', @@general_log, '1', 'MEDIUM'
UNION ALL
SELECT 'slow_query_log', @@slow_query_log, '1', 'LOW';

-- 创建安全配置检查存储过程
DELIMITER //
CREATE PROCEDURE CheckSecurityConfiguration()
BEGIN
CREATE TEMPORARY TABLE security_config_check (
config_name VARCHAR(50),
current_value VARCHAR(255),
is_secure TINYINT,
recommendation TEXT
);

-- 检查local_infile
INSERT INTO security_config_check VALUES (
'local_infile',
@@local_infile,
CASE WHEN @@local_infile = 0 THEN 1 ELSE 0 END,
'应该禁用以防止本地文件读取攻击'
);

-- 检查secure_file_priv
INSERT INTO security_config_check VALUES (
'secure_file_priv',
@@secure_file_priv,
CASE WHEN @@secure_file_priv IS NOT NULL AND @@secure_file_priv != '' THEN 1 ELSE 0 END,
'应该设置为特定目录以限制文件操作'
);

-- 检查skip_networking
INSERT INTO security_config_check VALUES (
'skip_networking',
@@skip_networking,
@@skip_networking,
'如果只需要本地连接,建议启用'
);

-- 输出检查结果
SELECT
config_name as '配置项',
current_value as '当前值',
CASE is_secure WHEN 1 THEN '安全' ELSE '不安全' END as '安全状态',
recommendation as '建议'
FROM security_config_check;

DROP TEMPORARY TABLE security_config_check;
END //
DELIMITER ;

-- 执行安全配置检查
CALL CheckSecurityConfiguration();

备份和恢复安全

1. 安全备份策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
-- 创建备份任务表
CREATE TABLE backup_tasks (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL,
database_name VARCHAR(64) NOT NULL,
backup_type ENUM('FULL', 'INCREMENTAL', 'DIFFERENTIAL') NOT NULL,
backup_path VARCHAR(255) NOT NULL,
encryption_enabled TINYINT DEFAULT 1,
compression_enabled TINYINT DEFAULT 1,
schedule_cron VARCHAR(50),
retention_days INT DEFAULT 30,
is_active TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_database_name (database_name),
INDEX idx_schedule (schedule_cron, is_active)
);

-- 插入备份任务
INSERT INTO backup_tasks (task_name, database_name, backup_type, backup_path, schedule_cron) VALUES
('Daily Full Backup', 'myapp', 'FULL', '/backup/mysql/daily/', '0 2 * * *'),
('Hourly Incremental', 'myapp', 'INCREMENTAL', '/backup/mysql/incremental/', '0 * * * *'),
('Weekly Archive', 'myapp', 'FULL', '/backup/mysql/archive/', '0 1 * * 0');

-- 备份执行日志
CREATE TABLE backup_execution_log (
id INT AUTO_INCREMENT PRIMARY KEY,
task_id INT NOT NULL,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
status ENUM('RUNNING', 'SUCCESS', 'FAILED') NOT NULL,
backup_file_path VARCHAR(500),
backup_size_mb DECIMAL(10,2),
error_message TEXT,
checksum VARCHAR(64),
FOREIGN KEY (task_id) REFERENCES backup_tasks(id),
INDEX idx_task_id (task_id),
INDEX idx_start_time (start_time),
INDEX idx_status (status)
);

-- 备份执行存储过程
DELIMITER //
CREATE PROCEDURE ExecuteBackup(IN task_id INT)
BEGIN
DECLARE v_task_name VARCHAR(100);
DECLARE v_database_name VARCHAR(64);
DECLARE v_backup_type VARCHAR(20);
DECLARE v_backup_path VARCHAR(255);
DECLARE v_encryption_enabled TINYINT;
DECLARE v_backup_file VARCHAR(500);
DECLARE v_log_id INT;
DECLARE v_start_time TIMESTAMP DEFAULT NOW();

-- 获取备份任务信息
SELECT task_name, database_name, backup_type, backup_path, encryption_enabled
INTO v_task_name, v_database_name, v_backup_type, v_backup_path, v_encryption_enabled
FROM backup_tasks
WHERE id = task_id AND is_active = 1;

-- 生成备份文件名
SET v_backup_file = CONCAT(
v_backup_path,
v_database_name, '_',
v_backup_type, '_',
DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'),
CASE WHEN v_encryption_enabled = 1 THEN '.sql.enc' ELSE '.sql' END
);

-- 记录备份开始
INSERT INTO backup_execution_log (task_id, start_time, status, backup_file_path)
VALUES (task_id, v_start_time, 'RUNNING', v_backup_file);
SET v_log_id = LAST_INSERT_ID();

-- 这里应该调用实际的备份命令
-- 由于在SQL中无法直接执行系统命令,这里只是示例

-- 模拟备份完成
UPDATE backup_execution_log
SET
end_time = NOW(),
status = 'SUCCESS',
backup_size_mb = 150.5,
checksum = MD5(CONCAT(v_backup_file, NOW()))
WHERE id = v_log_id;

SELECT CONCAT('备份任务 ', v_task_name, ' 执行完成') as result;
END //
DELIMITER ;

-- 备份验证存储过程
DELIMITER //
CREATE PROCEDURE VerifyBackup(IN log_id INT)
BEGIN
DECLARE v_backup_file VARCHAR(500);
DECLARE v_checksum VARCHAR(64);
DECLARE v_verification_result TINYINT DEFAULT 0;

SELECT backup_file_path, checksum
INTO v_backup_file, v_checksum
FROM backup_execution_log
WHERE id = log_id;

-- 这里应该验证备份文件的完整性
-- 实际实现中需要调用系统命令验证文件
SET v_verification_result = 1; -- 假设验证成功

-- 记录验证结果
INSERT INTO backup_verification_log (
backup_log_id, verification_time, is_valid, verification_details
) VALUES (
log_id, NOW(), v_verification_result,
JSON_OBJECT('file_path', v_backup_file, 'checksum_match', v_verification_result)
);

SELECT
CASE v_verification_result
WHEN 1 THEN '备份验证成功'
ELSE '备份验证失败'
END as result;
END //
DELIMITER ;

-- 创建备份验证日志表
CREATE TABLE backup_verification_log (
id INT AUTO_INCREMENT PRIMARY KEY,
backup_log_id INT NOT NULL,
verification_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_valid TINYINT NOT NULL,
verification_details JSON,
FOREIGN KEY (backup_log_id) REFERENCES backup_execution_log(id),
INDEX idx_backup_log_id (backup_log_id),
INDEX idx_verification_time (verification_time)
);

2. 恢复测试和验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
-- 创建恢复测试表
CREATE TABLE recovery_tests (
id INT AUTO_INCREMENT PRIMARY KEY,
test_name VARCHAR(100) NOT NULL,
backup_log_id INT NOT NULL,
test_database VARCHAR(64) NOT NULL,
test_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
test_end_time TIMESTAMP NULL,
test_status ENUM('RUNNING', 'SUCCESS', 'FAILED') NOT NULL,
recovery_time_seconds INT,
data_integrity_check TINYINT,
error_details TEXT,
FOREIGN KEY (backup_log_id) REFERENCES backup_execution_log(id),
INDEX idx_backup_log_id (backup_log_id),
INDEX idx_test_start_time (test_start_time)
);

-- 恢复测试存储过程
DELIMITER //
CREATE PROCEDURE TestBackupRecovery(
IN backup_log_id INT,
IN test_db_name VARCHAR(64)
)
BEGIN
DECLARE v_backup_file VARCHAR(500);
DECLARE v_test_id INT;
DECLARE v_start_time TIMESTAMP DEFAULT NOW();
DECLARE v_recovery_success TINYINT DEFAULT 0;

-- 获取备份文件信息
SELECT backup_file_path INTO v_backup_file
FROM backup_execution_log
WHERE id = backup_log_id AND status = 'SUCCESS';

IF v_backup_file IS NULL THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '备份文件不存在或备份未成功';
END IF;

-- 记录测试开始
INSERT INTO recovery_tests (
test_name, backup_log_id, test_database, test_status
) VALUES (
CONCAT('Recovery Test ', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s')),
backup_log_id, test_db_name, 'RUNNING'
);
SET v_test_id = LAST_INSERT_ID();

-- 这里应该执行实际的恢复操作
-- 1. 创建测试数据库
-- 2. 恢复备份数据
-- 3. 验证数据完整性

-- 模拟恢复成功
SET v_recovery_success = 1;

-- 更新测试结果
UPDATE recovery_tests
SET
test_end_time = NOW(),
test_status = CASE WHEN v_recovery_success = 1 THEN 'SUCCESS' ELSE 'FAILED' END,
recovery_time_seconds = TIMESTAMPDIFF(SECOND, v_start_time, NOW()),
data_integrity_check = v_recovery_success
WHERE id = v_test_id;

SELECT
CASE v_recovery_success
WHEN 1 THEN '恢复测试成功'
ELSE '恢复测试失败'
END as result;
END //
DELIMITER ;

安全最佳实践总结

1. 安全检查清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
-- 创建安全检查清单视图
CREATE VIEW security_checklist AS
SELECT
'用户管理' as category,
'删除默认用户账户' as check_item,
CASE WHEN (SELECT COUNT(*) FROM mysql.user WHERE User = '' OR User = 'root' AND Host != 'localhost') = 0
THEN '通过' ELSE '失败' END as status,
'删除匿名用户和远程root用户' as description

UNION ALL

SELECT
'用户管理',
'密码策略',
CASE WHEN @@validate_password.policy = 'STRONG' THEN '通过' ELSE '需要改进' END,
'启用强密码策略'

UNION ALL

SELECT
'网络安全',
'SSL/TLS加密',
CASE WHEN @@have_ssl = 'YES' THEN '通过' ELSE '失败' END,
'启用SSL/TLS传输加密'

UNION ALL

SELECT
'配置安全',
'禁用local_infile',
CASE WHEN @@local_infile = 0 THEN '通过' ELSE '失败' END,
'防止本地文件读取攻击'

UNION ALL

SELECT
'配置安全',
'设置secure_file_priv',
CASE WHEN @@secure_file_priv IS NOT NULL AND @@secure_file_priv != '' THEN '通过' ELSE '失败' END,
'限制文件操作目录'

UNION ALL

SELECT
'审计监控',
'启用审计日志',
CASE WHEN (SELECT COUNT(*) FROM information_schema.PLUGINS WHERE PLUGIN_NAME LIKE '%audit%' AND PLUGIN_STATUS = 'ACTIVE') > 0
THEN '通过' ELSE '需要配置' END,
'记录数据库操作日志'

UNION ALL

SELECT
'备份安全',
'定期备份测试',
CASE WHEN (SELECT COUNT(*) FROM recovery_tests WHERE test_start_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)) > 0
THEN '通过' ELSE '需要执行' END,
'定期测试备份恢复功能';

-- 查看安全检查结果
SELECT * FROM security_checklist ORDER BY category, check_item;

2. 安全维护计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
-- 创建安全维护任务表
CREATE TABLE security_maintenance_tasks (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL,
task_type ENUM('PASSWORD_ROTATION', 'PRIVILEGE_REVIEW', 'SECURITY_SCAN', 'BACKUP_TEST', 'LOG_CLEANUP') NOT NULL,
frequency_days INT NOT NULL,
last_executed TIMESTAMP NULL,
next_execution TIMESTAMP NULL,
is_active TINYINT DEFAULT 1,
task_description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入维护任务
INSERT INTO security_maintenance_tasks (task_name, task_type, frequency_days, task_description) VALUES
('用户密码轮换', 'PASSWORD_ROTATION', 90, '定期要求用户更换密码'),
('权限审查', 'PRIVILEGE_REVIEW', 30, '审查用户权限是否合理'),
('安全扫描', 'SECURITY_SCAN', 7, '扫描安全漏洞和配置问题'),
('备份恢复测试', 'BACKUP_TEST', 30, '测试备份文件的可恢复性'),
('日志清理', 'LOG_CLEANUP', 7, '清理过期的审计和安全日志');

-- 安全维护执行存储过程
DELIMITER //
CREATE PROCEDURE ExecuteSecurityMaintenance()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_task_id INT;
DECLARE v_task_name VARCHAR(100);
DECLARE v_task_type VARCHAR(50);

DECLARE maintenance_cursor CURSOR FOR
SELECT id, task_name, task_type
FROM security_maintenance_tasks
WHERE is_active = 1
AND (next_execution IS NULL OR next_execution <= NOW());

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

OPEN maintenance_cursor;

maintenance_loop: LOOP
FETCH maintenance_cursor INTO v_task_id, v_task_name, v_task_type;

IF done THEN LEAVE maintenance_loop; END IF;

-- 根据任务类型执行相应操作
CASE v_task_type
WHEN 'PRIVILEGE_REVIEW' THEN
CALL ReviewUserPrivileges();
WHEN 'SECURITY_SCAN' THEN
CALL RunSecurityScan();
WHEN 'LOG_CLEANUP' THEN
CALL CleanupSecurityLogs();
WHEN 'BACKUP_TEST' THEN
-- 执行备份测试
SELECT '执行备份测试' as task_result;
END CASE;

-- 更新任务执行时间
UPDATE security_maintenance_tasks
SET
last_executed = NOW(),
next_execution = DATE_ADD(NOW(), INTERVAL frequency_days DAY)
WHERE id = v_task_id;

END LOOP;

CLOSE maintenance_cursor;
END //
DELIMITER ;

-- 权限审查存储过程
DELIMITER //
CREATE PROCEDURE ReviewUserPrivileges()
BEGIN
-- 查找可能的权限问题
SELECT
'权限审查结果' as review_type,
GRANTEE as user_account,
PRIVILEGE_TYPE as privilege,
TABLE_SCHEMA as database_name,
'权限过高' as issue_type
FROM information_schema.TABLE_PRIVILEGES
WHERE PRIVILEGE_TYPE = 'ALL PRIVILEGES'
AND GRANTEE NOT LIKE '%root%'
AND GRANTEE NOT LIKE '%admin%'

UNION ALL

SELECT
'权限审查结果',
GRANTEE,
PRIVILEGE_TYPE,
'GLOBAL',
'全局权限'
FROM information_schema.USER_PRIVILEGES
WHERE PRIVILEGE_TYPE IN ('CREATE USER', 'GRANT OPTION', 'SUPER')
AND GRANTEE NOT LIKE '%root%'
AND GRANTEE NOT LIKE '%admin%';
END //
DELIMITER ;

-- 安全扫描存储过程
DELIMITER //
CREATE PROCEDURE RunSecurityScan()
BEGIN
-- 检查弱密码用户(这里只是示例)
SELECT
'安全扫描结果' as scan_type,
User as username,
Host as host,
'可能使用弱密码' as issue
FROM mysql.user
WHERE authentication_string = ''
OR LENGTH(authentication_string) < 20;

-- 检查过期密码
SELECT
'安全扫描结果',
User,
Host,
'密码已过期'
FROM mysql.user
WHERE password_expired = 'Y';

-- 检查锁定账户
SELECT
'安全扫描结果',
User,
Host,
'账户被锁定'
FROM mysql.user
WHERE account_locked = 'Y';
END //
DELIMITER ;

3. 应急响应计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
-- 创建安全事件响应表
CREATE TABLE security_incident_response (
id INT AUTO_INCREMENT PRIMARY KEY,
incident_id VARCHAR(50) NOT NULL UNIQUE,
incident_type ENUM('DATA_BREACH', 'UNAUTHORIZED_ACCESS', 'PRIVILEGE_ESCALATION', 'MALWARE', 'DDOS') NOT NULL,
severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL,
status ENUM('OPEN', 'INVESTIGATING', 'CONTAINED', 'RESOLVED', 'CLOSED') NOT NULL DEFAULT 'OPEN',
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved_at TIMESTAMP NULL,
affected_systems TEXT,
incident_details JSON,
response_actions TEXT,
lessons_learned TEXT,
INDEX idx_incident_type (incident_type),
INDEX idx_severity (severity),
INDEX idx_status (status),
INDEX idx_detected_at (detected_at)
);

-- 应急响应存储过程
DELIMITER //
CREATE PROCEDURE HandleSecurityIncident(
IN incident_type VARCHAR(50),
IN severity VARCHAR(10),
IN incident_details JSON
)
BEGIN
DECLARE incident_id VARCHAR(50);
DECLARE response_actions TEXT DEFAULT '';

-- 生成事件ID
SET incident_id = CONCAT('INC-', DATE_FORMAT(NOW(), '%Y%m%d-%H%i%s'));

-- 根据事件类型和严重程度确定响应措施
CASE incident_type
WHEN 'UNAUTHORIZED_ACCESS' THEN
SET response_actions = '1. 立即锁定相关账户\n2. 审查访问日志\n3. 检查数据完整性\n4. 通知安全团队';

-- 自动锁定可疑账户
UPDATE mysql.user
SET account_locked = 'Y'
WHERE User = JSON_UNQUOTE(JSON_EXTRACT(incident_details, '$.username'));

WHEN 'PRIVILEGE_ESCALATION' THEN
SET response_actions = '1. 撤销异常权限\n2. 审查权限变更日志\n3. 检查系统配置\n4. 加强权限监控';

WHEN 'DATA_BREACH' THEN
SET response_actions = '1. 隔离受影响系统\n2. 评估数据泄露范围\n3. 通知相关部门\n4. 准备公告';

-- 如果是严重数据泄露,可以考虑临时禁用某些功能
IF severity = 'CRITICAL' THEN
-- 这里可以添加紧急措施
SET response_actions = CONCAT(response_actions, '\n5. 执行紧急隔离措施');
END IF;
END CASE;

-- 记录安全事件
INSERT INTO security_incident_response (
incident_id, incident_type, severity, affected_systems,
incident_details, response_actions
) VALUES (
incident_id, incident_type, severity, 'MySQL Database',
incident_details, response_actions
);

-- 发送告警通知(这里只是记录,实际应该发送邮件或短信)
INSERT INTO security_events (
event_type, event_details, severity
) VALUES (
'SECURITY_INCIDENT',
JSON_OBJECT('incident_id', incident_id, 'type', incident_type),
severity
);

SELECT
incident_id as '事件ID',
incident_type as '事件类型',
severity as '严重程度',
response_actions as '响应措施';
END //
DELIMITER ;

-- 测试应急响应
CALL HandleSecurityIncident(
'UNAUTHORIZED_ACCESS',
'HIGH',
JSON_OBJECT('username', 'suspicious_user', 'ip_address', '192.168.1.999', 'failed_attempts', 20)
);

通过以上全面的安全防护措施,可以构建一个安全可靠的MySQL数据库环境。安全是一个持续的过程,需要定期审查、更新和改进安全策略,确保数据库始终处于最佳的安全状态。

记住,数据库安全不仅仅是技术问题,还涉及管理制度、人员培训和应急响应等多个方面。只有综合考虑这些因素,才能真正保障数据库的安全。

本站由 提供部署服务