1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
| from typing import Dict, Set import uuid import logging
logger = logging.getLogger(__name__)
class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} self.rooms: Dict[str, Set[str]] = {} self.user_connections: Dict[str, str] = {} async def connect(self, websocket: WebSocket, user_id: str = None) -> str: """建立连接""" await websocket.accept() connection_id = str(uuid.uuid4()) self.active_connections[connection_id] = websocket if user_id: if user_id in self.user_connections: old_connection_id = self.user_connections[user_id] await self.disconnect(old_connection_id) self.user_connections[user_id] = connection_id logger.info(f"New connection: {connection_id}, User: {user_id}") return connection_id async def disconnect(self, connection_id: str): """断开连接""" if connection_id in self.active_connections: websocket = self.active_connections[connection_id] for room_id in list(self.rooms.keys()): if connection_id in self.rooms[room_id]: self.rooms[room_id].remove(connection_id) if not self.rooms[room_id]: del self.rooms[room_id] user_id = None for uid, cid in self.user_connections.items(): if cid == connection_id: user_id = uid break if user_id: del self.user_connections[user_id] try: await websocket.close() except: pass del self.active_connections[connection_id] logger.info(f"Connection disconnected: {connection_id}, User: {user_id}") async def send_personal_message(self, message: dict, connection_id: str): """发送个人消息""" if connection_id in self.active_connections: websocket = self.active_connections[connection_id] try: await websocket.send_text(json.dumps(message)) except: await self.disconnect(connection_id) async def send_to_user(self, message: dict, user_id: str): """发送消息给特定用户""" if user_id in self.user_connections: connection_id = self.user_connections[user_id] await self.send_personal_message(message, connection_id) async def join_room(self, connection_id: str, room_id: str): """加入房间""" if room_id not in self.rooms: self.rooms[room_id] = set() self.rooms[room_id].add(connection_id) logger.info(f"Connection {connection_id} joined room {room_id}") async def leave_room(self, connection_id: str, room_id: str): """离开房间""" if room_id in self.rooms and connection_id in self.rooms[room_id]: self.rooms[room_id].remove(connection_id) if not self.rooms[room_id]: del self.rooms[room_id] logger.info(f"Connection {connection_id} left room {room_id}") async def broadcast_to_room(self, message: dict, room_id: str, exclude_connection: str = None): """向房间广播消息""" if room_id not in self.rooms: return disconnected_connections = [] for connection_id in self.rooms[room_id]: if connection_id == exclude_connection: continue try: websocket = self.active_connections[connection_id] await websocket.send_text(json.dumps(message)) except: disconnected_connections.append(connection_id) for connection_id in disconnected_connections: await self.disconnect(connection_id)
manager = ConnectionManager()
@app.websocket("/ws/{user_id}") async def websocket_endpoint(websocket: WebSocket, user_id: str): connection_id = await manager.connect(websocket, user_id) try: while True: data = await websocket.receive_text() message = json.loads(data) await handle_message(message, connection_id, user_id) except WebSocketDisconnect: await manager.disconnect(connection_id)
async def handle_message(message: dict, connection_id: str, user_id: str): """处理WebSocket消息""" message_type = message.get("type") if message_type == "join_room": room_id = message.get("room_id") await manager.join_room(connection_id, room_id) await manager.broadcast_to_room({ "type": "user_joined", "user_id": user_id, "room_id": room_id, "timestamp": time.time() }, room_id, exclude_connection=connection_id) elif message_type == "room_message": room_id = message.get("room_id") content = message.get("content") await manager.broadcast_to_room({ "type": "room_message", "user_id": user_id, "room_id": room_id, "content": content, "timestamp": time.time() }, room_id)
|