1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import pymysql import random from contextlib import contextmanager
class MySQLCluster: def __init__(self, master_config, slave_configs): self.master_config = master_config self.slave_configs = slave_configs @contextmanager def get_write_connection(self): """获取写连接(主服务器)""" conn = pymysql.connect(**self.master_config) try: yield conn finally: conn.close() @contextmanager def get_read_connection(self): """获取读连接(从服务器)""" slave_config = random.choice(self.slave_configs) conn = pymysql.connect(**slave_config) try: yield conn finally: conn.close() def execute_write(self, sql, params=None): """执行写操作""" with self.get_write_connection() as conn: with conn.cursor() as cursor: cursor.execute(sql, params) conn.commit() return cursor.rowcount def execute_read(self, sql, params=None): """执行读操作""" with self.get_read_connection() as conn: with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) return cursor.fetchall() def execute_read_one(self, sql, params=None): """执行读操作(单行)""" with self.get_read_connection() as conn: with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) return cursor.fetchone()
master_config = { 'host': '192.168.1.100', 'user': 'app_user', 'password': 'app_pass', 'database': 'myapp', 'charset': 'utf8mb4' }
slave_configs = [ { 'host': '192.168.1.101', 'user': 'app_user', 'password': 'app_pass', 'database': 'myapp', 'charset': 'utf8mb4' }, { 'host': '192.168.1.102', 'user': 'app_user', 'password': 'app_pass', 'database': 'myapp', 'charset': 'utf8mb4' } ]
cluster = MySQLCluster(master_config, slave_configs)
cluster.execute_write( "INSERT INTO users (name, email) VALUES (%s, %s)", ('张三', 'zhangsan@example.com') )
users = cluster.execute_read("SELECT * FROM users WHERE status = %s", ('active',)) user = cluster.execute_read_one("SELECT * FROM users WHERE id = %s", (1,))
|