add sockets and change subscription

This commit is contained in:
DenAntonov
2025-06-12 21:04:12 +03:00
parent bffa3fa2a3
commit 24ff712306
6 changed files with 199 additions and 83 deletions

View File

@@ -5,13 +5,29 @@ class ChatSocketHandler {
this.io = io;
this.onlineUsers = new Map(); // Хранение онлайн пользователей: socket.id -> user info
this.chatRooms = new Map(); // Хранение участников комнат: chat_id -> Set(socket.id)
this.realtimeSubscription = null; // Ссылка на подписку для управления
this.setupSocketHandlers();
try {
this.setupRealtimeSubscription(); // Добавляем Real-time подписки
} catch (error) {
// Ignore error
}
// Запускаем тестирование через 2 секунды после инициализации
setTimeout(() => {
this.testRealtimeConnection();
}, 2000);
// Проверяем статус подписки через 5 секунд
setTimeout(() => {
this.checkSubscriptionStatus();
}, 5000);
}
setupSocketHandlers() {
this.io.on('connection', (socket) => {
console.log(`User connected: ${socket.id}`);
// Аутентификация пользователя
socket.on('authenticate', async (data) => {
await this.handleAuthentication(socket, data);
@@ -84,10 +100,7 @@ class ChatSocketHandler {
message: 'Successfully authenticated',
user: userProfile
});
console.log(`User ${user_id} authenticated with socket ${socket.id}`);
} catch (error) {
console.error('Authentication error:', error);
socket.emit('auth_error', { message: 'Authentication failed' });
}
}
@@ -105,7 +118,6 @@ class ChatSocketHandler {
socket.emit('error', { message: 'chat_id is required' });
return;
}
// Проверяем, что чат существует и пользователь имеет доступ к нему
const supabase = getSupabaseClient();
const { data: chat, error } = await supabase
@@ -140,7 +152,6 @@ class ChatSocketHandler {
socket.emit('error', { message: 'Access denied to this chat' });
return;
}
// Добавляем сокет в комнату
socket.join(chat_id);
@@ -148,8 +159,11 @@ class ChatSocketHandler {
if (!this.chatRooms.has(chat_id)) {
this.chatRooms.set(chat_id, new Set());
}
const participantsBefore = this.chatRooms.get(chat_id).size;
this.chatRooms.get(chat_id).add(socket.id);
const participantsAfter = this.chatRooms.get(chat_id).size;
socket.emit('joined_chat', {
chat_id,
chat: chat,
@@ -158,15 +172,13 @@ class ChatSocketHandler {
// Уведомляем других участников о подключении
const userInfo = this.onlineUsers.get(socket.id);
socket.to(chat_id).emit('user_joined', {
chat_id,
user: userInfo?.profile,
timestamp: new Date()
});
console.log(`User ${socket.user_id} joined chat ${chat_id}`);
} catch (error) {
console.error('Join chat error:', error);
socket.emit('error', { message: 'Failed to join chat' });
}
}
@@ -196,7 +208,7 @@ class ChatSocketHandler {
timestamp: new Date()
});
console.log(`User ${socket.user_id} left chat ${chat_id}`);
}
async handleSendMessage(socket, data) {
@@ -243,9 +255,7 @@ class ChatSocketHandler {
timestamp: new Date()
});
console.log(`Message sent to chat ${chat_id} by user ${socket.user_id}`);
} catch (error) {
console.error('Send message error:', error);
socket.emit('error', { message: 'Failed to send message' });
}
}
@@ -277,7 +287,6 @@ class ChatSocketHandler {
}
handleDisconnect(socket) {
console.log(`User disconnected: ${socket.id}`);
// Удаляем пользователя из всех комнат
this.chatRooms.forEach((participants, chat_id) => {
@@ -326,11 +335,119 @@ class ChatSocketHandler {
timestamp: new Date()
});
}
// Тестирование Real-time подписки
async testRealtimeConnection() {
try {
const supabase = getSupabaseClient();
if (!supabase) {
return false;
}
// Создаем тестовый канал для проверки подключения
const testChannel = supabase
.channel('test_connection')
.subscribe((status, error) => {
if (status === 'SUBSCRIBED') {
// Отписываемся от тестового канала
setTimeout(() => {
testChannel.unsubscribe();
}, 2000);
}
});
return true;
} catch (error) {
return false;
}
}
// Проверка статуса подписки
checkSubscriptionStatus() {
if (this.realtimeSubscription) {
return true;
} else {
return false;
}
}
setupRealtimeSubscription() {
// Добавляем небольшую задержку, чтобы убедиться, что Supabase клиент инициализирован
setTimeout(() => {
this._doSetupRealtimeSubscription();
}, 1000);
}
_doSetupRealtimeSubscription() {
try {
const supabase = getSupabaseClient();
if (!supabase) {
return;
}
// Подписываемся на изменения в таблице messages
const subscription = supabase
.channel('messages_changes')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'messages'
},
async (payload) => {
try {
const newMessage = payload.new;
if (!newMessage) {
return;
}
if (!newMessage.chat_id) {
return;
}
// Получаем профиль пользователя
const { data: userProfile, error: profileError } = await supabase
.from('user_profiles')
.select('id, full_name, avatar_url')
.eq('id', newMessage.user_id)
.single();
// Объединяем сообщение с профилем
const messageWithProfile = {
...newMessage,
user_profiles: userProfile || null
};
// Проверяем, есть ли участники в чате
const chatRoomParticipants = this.chatRooms.get(newMessage.chat_id);
// Отправляем сообщение через Socket.IO всем участникам чата
this.io.to(newMessage.chat_id).emit('new_message', {
message: messageWithProfile,
timestamp: new Date()
});
} catch (callbackError) {
// Ignore error
}
}
)
.subscribe();
// Сохраняем ссылку на подписку для возможности отписки
this.realtimeSubscription = subscription;
} catch (error) {
// Ignore error
}
}
}
// Функция инициализации Socket.IO для чатов
function initializeChatSocket(io) {
const chatHandler = new ChatSocketHandler(io);
return chatHandler;
}