1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| import pymysql import time import json from datetime import datetime
class QueryMonitor: def __init__(self, db_config): self.db_config = db_config self.connection = None def connect(self): self.connection = pymysql.connect(**self.db_config) def get_running_queries(self): """获取当前运行的查询""" with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, INFO FROM information_schema.PROCESSLIST WHERE COMMAND != 'Sleep' AND TIME > 1 ORDER BY TIME DESC """) return cursor.fetchall() def get_slow_queries(self, limit=10): """获取最慢的查询""" with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT DIGEST_TEXT, COUNT_STAR as execution_count, ROUND(SUM_TIMER_WAIT/1000000000, 2) as total_time_sec, ROUND(AVG_TIMER_WAIT/1000000000, 2) as avg_time_sec, ROUND(MAX_TIMER_WAIT/1000000000, 2) as max_time_sec, SUM_ROWS_EXAMINED, SUM_ROWS_SENT FROM performance_schema.events_statements_summary_by_digest WHERE DIGEST_TEXT IS NOT NULL ORDER BY SUM_TIMER_WAIT DESC LIMIT %s """, (limit,)) return cursor.fetchall() def kill_long_running_query(self, process_id): """终止长时间运行的查询""" with self.connection.cursor() as cursor: cursor.execute(f"KILL {process_id}") print(f"已终止查询进程: {process_id}") def monitor_loop(self, interval=30): """监控循环""" while True: try: print(f"\n=== 查询监控报告 {datetime.now()} ===") running_queries = self.get_running_queries() if running_queries: print(f"发现 {len(running_queries)} 个运行中的查询:") for query in running_queries: print(f" ID: {query['ID']}, 用户: {query['USER']}, " f"运行时间: {query['TIME']}秒") if query['TIME'] > 300: print(f" 警告: 长时间运行的查询!") slow_queries = self.get_slow_queries(5) if slow_queries: print(f"\n最慢的 {len(slow_queries)} 个查询:") for i, query in enumerate(slow_queries, 1): print(f" {i}. 平均耗时: {query['avg_time_sec']}秒, " f"执行次数: {query['execution_count']}") print(f" 查询: {query['DIGEST_TEXT'][:100]}...") time.sleep(interval) except KeyboardInterrupt: print("\n监控已停止") break except Exception as e: print(f"监控错误: {e}") time.sleep(interval)
if __name__ == "__main__": db_config = { 'host': 'localhost', 'user': 'monitor_user', 'password': 'monitor_pass', 'database': 'performance_schema', 'charset': 'utf8mb4' } monitor = QueryMonitor(db_config) monitor.connect() monitor.monitor_loop(30)
|