const { getSupabaseClient, initializationPromise } = require('./supabaseClient'); class ChatPollingHandler { constructor() { this.connectedClients = new Map(); // user_id -> { user_info, chats: Set(), lastActivity: Date } this.chatParticipants = new Map(); // chat_id -> Set(user_id) this.userEventQueues = new Map(); // user_id -> [{id, event, data, timestamp}] this.eventIdCounter = 0; this.realtimeSubscription = null; // Инициализируем Supabase подписку с задержкой и проверками this.initializeWithRetry(); // Очистка старых событий каждые 5 минут setInterval(() => { this.cleanupOldEvents(); }, 5 * 60 * 1000); } // Инициализация с повторными попытками async initializeWithRetry() { try { // Сначала ждем завершения основной инициализации await initializationPromise; this.setupRealtimeSubscription(); this.testRealtimeConnection(); return; } catch (error) { console.log('❌ [Supabase] Основная инициализация неудачна, пробуем альтернативный подход'); } // Если основная инициализация не удалась, используем повторные попытки let attempts = 0; const maxAttempts = 10; const baseDelay = 2000; // 2 секунды while (attempts < maxAttempts) { try { attempts++; // Ждем перед попыткой await new Promise(resolve => setTimeout(resolve, baseDelay * attempts)); // Проверяем готовность Supabase клиента const supabase = getSupabaseClient(); if (supabase) { this.setupRealtimeSubscription(); this.testRealtimeConnection(); return; // Успех, выходим } } catch (error) { console.log(`❌ [Supabase] Попытка #${attempts} неудачна:`, error.message); if (attempts === maxAttempts) { console.error('❌ [Supabase] Все попытки инициализации исчерпаны'); console.error('❌ [Supabase] Realtime подписка будет недоступна'); return; } } } } // Аутентификация пользователя async handleAuthentication(req, res) { const { user_id, token } = req.body; if (!user_id) { res.status(400).json({ error: 'user_id is required' }); return; } try { // Проверяем пользователя в базе данных const supabase = getSupabaseClient(); const { data: userProfile, error } = await supabase .from('user_profiles') .select('*') .eq('id', user_id) .single(); if (error) { console.log('❌ [Polling Server] Пользователь не найден:', error); res.status(401).json({ error: 'User not found' }); return; } // Регистрируем пользователя this.connectedClients.set(user_id, { user_info: { user_id, profile: userProfile, last_seen: new Date() }, chats: new Set(), lastActivity: new Date() }); // Создаем очередь событий для пользователя if (!this.userEventQueues.has(user_id)) { this.userEventQueues.set(user_id, []); } // Добавляем событие аутентификации в очередь this.addEventToQueue(user_id, 'authenticated', { message: 'Successfully authenticated', user: userProfile }); res.json({ success: true, message: 'Successfully authenticated', user: userProfile }); } catch (error) { console.error('❌ [Polling Server] Ошибка аутентификации:', error); res.status(500).json({ error: 'Authentication failed' }); } } // Эндпоинт для получения событий (polling) async handleGetEvents(req, res) { try { const { user_id, last_event_id } = req.query; if (!user_id) { res.status(400).json({ error: 'user_id is required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } // Обновляем время последней активности client.lastActivity = new Date(); // Получаем очередь событий пользователя const eventQueue = this.userEventQueues.get(user_id) || []; // Фильтруем события после last_event_id const lastEventId = parseInt(last_event_id) || 0; const newEvents = eventQueue.filter(event => event.id > lastEventId); res.json({ success: true, events: newEvents, last_event_id: eventQueue.length > 0 ? Math.max(...eventQueue.map(e => e.id)) : lastEventId }); } catch (error) { console.error('❌ [Polling Server] Ошибка получения событий:', error); res.status(500).json({ error: 'Failed to get events' }); } } // HTTP эндпоинт для присоединения к чату async handleJoinChat(req, res) { try { const { user_id, chat_id } = req.body; if (!user_id || !chat_id) { res.status(400).json({ error: 'user_id and chat_id are required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } // Проверяем, что чат существует и пользователь имеет доступ к нему const supabase = getSupabaseClient(); const { data: chat, error } = await supabase .from('chats') .select(` *, buildings ( management_company_id, apartments ( apartment_residents ( user_id ) ) ) `) .eq('id', chat_id) .single(); if (error || !chat) { res.status(404).json({ error: 'Chat not found' }); return; } // Проверяем доступ пользователя к чату через квартиры в доме const hasAccess = chat.buildings.apartments.some(apartment => apartment.apartment_residents.some(resident => resident.user_id === user_id ) ); if (!hasAccess) { res.status(403).json({ error: 'Access denied to this chat' }); return; } // Добавляем пользователя в чат client.chats.add(chat_id); if (!this.chatParticipants.has(chat_id)) { this.chatParticipants.set(chat_id, new Set()); } this.chatParticipants.get(chat_id).add(user_id); // Добавляем событие присоединения в очередь пользователя this.addEventToQueue(user_id, 'joined_chat', { chat_id, chat: chat, message: 'Successfully joined chat' }); // Уведомляем других участников о подключении this.broadcastToChatExcludeUser(chat_id, user_id, 'user_joined', { chat_id, user: client.user_info.profile, timestamp: new Date() }); res.json({ success: true, message: 'Joined chat successfully' }); } catch (error) { res.status(500).json({ error: 'Failed to join chat' }); } } // HTTP эндпоинт для покидания чата async handleLeaveChat(req, res) { try { const { user_id, chat_id } = req.body; if (!user_id || !chat_id) { res.status(400).json({ error: 'user_id and chat_id are required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } // Удаляем пользователя из чата client.chats.delete(chat_id); if (this.chatParticipants.has(chat_id)) { this.chatParticipants.get(chat_id).delete(user_id); // Если чат пуст, удаляем его if (this.chatParticipants.get(chat_id).size === 0) { this.chatParticipants.delete(chat_id); } } // Уведомляем других участников об отключении this.broadcastToChatExcludeUser(chat_id, user_id, 'user_left', { chat_id, user: client.user_info.profile, timestamp: new Date() }); res.json({ success: true, message: 'Left chat successfully' }); } catch (error) { res.status(500).json({ error: 'Failed to leave chat' }); } } // HTTP эндпоинт для отправки сообщения async handleSendMessage(req, res) { try { const { user_id, chat_id, text } = req.body; if (!user_id || !chat_id || !text) { res.status(400).json({ error: 'user_id, chat_id and text are required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } if (!client.chats.has(chat_id)) { res.status(403).json({ error: 'Not joined to this chat' }); return; } // Сохраняем сообщение в базу данных const supabase = getSupabaseClient(); const { data: message, error } = await supabase .from('messages') .insert({ chat_id, user_id, text }) .select(` *, user_profiles ( id, full_name, avatar_url ) `) .single(); if (error) { res.status(500).json({ error: 'Failed to save message' }); return; } // Отправляем сообщение всем участникам чата this.broadcastToChat(chat_id, 'new_message', { message, timestamp: new Date() }); res.json({ success: true, message: 'Message sent successfully' }); } catch (error) { res.status(500).json({ error: 'Failed to send message' }); } } // HTTP эндпоинт для индикации печатания async handleTypingStart(req, res) { try { const { user_id, chat_id } = req.body; if (!user_id || !chat_id) { res.status(400).json({ error: 'user_id and chat_id are required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } if (!client.chats.has(chat_id)) { res.status(403).json({ error: 'Not joined to this chat' }); return; } this.broadcastToChatExcludeUser(chat_id, user_id, 'user_typing_start', { chat_id, user: client.user_info.profile, timestamp: new Date() }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: 'Failed to send typing indicator' }); } } // HTTP эндпоинт для остановки индикации печатания async handleTypingStop(req, res) { try { const { user_id, chat_id } = req.body; if (!user_id || !chat_id) { res.status(400).json({ error: 'user_id and chat_id are required' }); return; } const client = this.connectedClients.get(user_id); if (!client) { res.status(401).json({ error: 'Not authenticated' }); return; } if (!client.chats.has(chat_id)) { res.status(403).json({ error: 'Not joined to this chat' }); return; } this.broadcastToChatExcludeUser(chat_id, user_id, 'user_typing_stop', { chat_id, user: client.user_info.profile, timestamp: new Date() }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: 'Failed to send typing indicator' }); } } // Обработка отключения клиента handleClientDisconnect(user_id) { const client = this.connectedClients.get(user_id); if (!client) return; // Удаляем пользователя из всех чатов client.chats.forEach(chat_id => { if (this.chatParticipants.has(chat_id)) { this.chatParticipants.get(chat_id).delete(user_id); // Уведомляем других участников об отключении this.broadcastToChatExcludeUser(chat_id, user_id, 'user_left', { chat_id, user: client.user_info.profile, timestamp: new Date() }); // Если чат пуст, удаляем его if (this.chatParticipants.get(chat_id).size === 0) { this.chatParticipants.delete(chat_id); } } }); // Удаляем клиента this.connectedClients.delete(user_id); } // Добавление события в очередь пользователя addEventToQueue(user_id, event, data) { if (!this.userEventQueues.has(user_id)) { this.userEventQueues.set(user_id, []); } const eventQueue = this.userEventQueues.get(user_id); const eventId = ++this.eventIdCounter; eventQueue.push({ id: eventId, event, data, timestamp: new Date() }); // Ограничиваем размер очереди (последние 100 событий) if (eventQueue.length > 100) { eventQueue.splice(0, eventQueue.length - 100); } } // Рассылка события всем участникам чата broadcastToChat(chat_id, event, data) { const participants = this.chatParticipants.get(chat_id); if (!participants) return; participants.forEach(user_id => { this.addEventToQueue(user_id, event, data); }); } // Рассылка события всем участникам чата кроме отправителя broadcastToChatExcludeUser(chat_id, exclude_user_id, event, data) { const participants = this.chatParticipants.get(chat_id); if (!participants) return; participants.forEach(user_id => { if (user_id !== exclude_user_id) { this.addEventToQueue(user_id, event, data); } }); } // Получение списка онлайн пользователей в чате getOnlineUsersInChat(chat_id) { const participants = this.chatParticipants.get(chat_id) || new Set(); const onlineUsers = []; const now = new Date(); const ONLINE_THRESHOLD = 2 * 60 * 1000; // 2 минуты participants.forEach(user_id => { const client = this.connectedClients.get(user_id); if (client && (now - client.lastActivity) < ONLINE_THRESHOLD) { onlineUsers.push(client.user_info.profile); } }); return onlineUsers; } // Отправка системного сообщения в чат async sendSystemMessage(chat_id, text) { this.broadcastToChat(chat_id, 'system_message', { chat_id, text, timestamp: new Date() }); } // Очистка старых событий cleanupOldEvents() { const now = new Date(); const MAX_EVENT_AGE = 24 * 60 * 60 * 1000; // 24 часа const INACTIVE_USER_THRESHOLD = 60 * 60 * 1000; // 1 час // Очищаем старые события this.userEventQueues.forEach((eventQueue, user_id) => { const filteredEvents = eventQueue.filter(event => (now - event.timestamp) < MAX_EVENT_AGE ); if (filteredEvents.length !== eventQueue.length) { this.userEventQueues.set(user_id, filteredEvents); } }); // Удаляем неактивных пользователей this.connectedClients.forEach((client, user_id) => { if ((now - client.lastActivity) > INACTIVE_USER_THRESHOLD) { this.handleClientDisconnect(user_id); this.userEventQueues.delete(user_id); } }); } // Тестирование Real-time подписки async testRealtimeConnection() { try { const supabase = getSupabaseClient(); if (!supabase) { return false; } // Создаем тестовый канал для проверки подключения const testChannel = supabase .channel('test_connection') .subscribe((status, error) => { if (error) { console.error('❌ [Supabase] Тестовый канал - ошибка:', error); } if (status === 'SUBSCRIBED') { // Отписываемся от тестового канала setTimeout(() => { testChannel.unsubscribe(); }, 2000); } }); return true; } catch (error) { console.error('❌ [Supabase] Ошибка тестирования Realtime:', error); return false; } } // Проверка статуса подписки checkSubscriptionStatus() { if (this.realtimeSubscription) { return true; } else { return false; } } setupRealtimeSubscription() { // Убираем setTimeout, вызываем сразу this._doSetupRealtimeSubscription(); } _doSetupRealtimeSubscription() { try { const supabase = getSupabaseClient(); if (!supabase) { console.log('❌ [Supabase] Supabase клиент не найден'); throw new Error('Supabase client not available'); } // Подписываемся на изменения в таблице messages const subscription = supabase .channel('messages_changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, async (payload) => { try { const newMessage = payload.new; if (!newMessage) { return; } if (!newMessage.chat_id) { return; } // Получаем профиль пользователя const { data: userProfile, error: profileError } = await supabase .from('user_profiles') .select('id, full_name, avatar_url') .eq('id', newMessage.user_id) .single(); if (profileError) { console.error('❌ [Supabase] Ошибка получения профиля пользователя:', profileError); } // Объединяем сообщение с профилем const messageWithProfile = { ...newMessage, user_profiles: userProfile || null }; // Отправляем сообщение всем участникам чата this.broadcastToChat(newMessage.chat_id, 'new_message', { message: messageWithProfile, timestamp: new Date() }); } catch (callbackError) { console.error('❌ [Supabase] Ошибка в обработчике сообщения:', callbackError); } } ) .subscribe((status, error) => { if (error) { console.error('❌ [Supabase] Ошибка подписки:', error); } if (status === 'CHANNEL_ERROR') { console.error('❌ [Supabase] Ошибка канала'); } else if (status === 'TIMED_OUT') { console.error('❌ [Supabase] Таймаут подписки'); } }); // Сохраняем ссылку на подписку для возможности отписки this.realtimeSubscription = subscription; } catch (error) { console.error('❌ [Supabase] Критическая ошибка при настройке подписки:', error); throw error; // Пробрасываем ошибку для обработки в initializeWithRetry } } // Получение статистики подключений getConnectionStats() { return { connectedClients: this.connectedClients.size, activeChats: this.chatParticipants.size, totalChatParticipants: Array.from(this.chatParticipants.values()) .reduce((total, participants) => total + participants.size, 0), totalEventQueues: this.userEventQueues.size, totalEvents: Array.from(this.userEventQueues.values()) .reduce((total, queue) => total + queue.length, 0) }; } } // Функция для создания роутера с polling эндпоинтами function createChatPollingRouter(express) { const router = express.Router(); const chatHandler = new ChatPollingHandler(); // CORS middleware для всех запросов router.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); res.header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Authorization'); res.header('Access-Control-Allow-Credentials', 'true'); // Обрабатываем OPTIONS запросы if (req.method === 'OPTIONS') { res.status(200).end(); return; } next(); }); // Эндпоинт для аутентификации router.post('/auth', (req, res) => { chatHandler.handleAuthentication(req, res); }); // Эндпоинт для получения событий (polling) router.get('/events', (req, res) => { chatHandler.handleGetEvents(req, res); }); // HTTP эндпоинты для действий router.post('/join-chat', (req, res) => { chatHandler.handleJoinChat(req, res); }); router.post('/leave-chat', (req, res) => { chatHandler.handleLeaveChat(req, res); }); router.post('/send-message', (req, res) => { chatHandler.handleSendMessage(req, res); }); router.post('/typing-start', (req, res) => { chatHandler.handleTypingStart(req, res); }); router.post('/typing-stop', (req, res) => { chatHandler.handleTypingStop(req, res); }); // Эндпоинт для получения онлайн пользователей в чате router.get('/online-users/:chat_id', (req, res) => { const { chat_id } = req.params; const onlineUsers = chatHandler.getOnlineUsersInChat(chat_id); res.json({ onlineUsers }); }); // Эндпоинт для получения статистики router.get('/stats', (req, res) => { const stats = chatHandler.getConnectionStats(); res.json(stats); }); // Эндпоинт для проверки статуса Supabase подписки router.get('/supabase-status', (req, res) => { const isConnected = chatHandler.checkSubscriptionStatus(); res.json({ supabaseSubscriptionActive: isConnected, subscriptionExists: !!chatHandler.realtimeSubscription, subscriptionInfo: chatHandler.realtimeSubscription ? { channel: chatHandler.realtimeSubscription.topic, state: chatHandler.realtimeSubscription.state } : null }); }); // Эндпоинт для принудительного переподключения к Supabase router.post('/reconnect-supabase', (req, res) => { try { // Отписываемся от текущей подписки if (chatHandler.realtimeSubscription) { chatHandler.realtimeSubscription.unsubscribe(); chatHandler.realtimeSubscription = null; } // Создаем новую подписку chatHandler.setupRealtimeSubscription(); res.json({ success: true, message: 'Reconnection initiated' }); } catch (error) { console.error('❌ [Polling Server] Ошибка переподключения:', error); res.status(500).json({ success: false, error: 'Reconnection failed', details: error.message }); } }); // Тестовый эндпоинт для создания сообщения в обход API router.post('/test-message', async (req, res) => { const { chat_id, user_id, text } = req.body; if (!chat_id || !user_id || !text) { res.status(400).json({ error: 'chat_id, user_id и text обязательны' }); return; } try { // Создаем тестовое событие напрямую chatHandler.broadcastToChat(chat_id, 'new_message', { message: { id: `test_${Date.now()}`, chat_id, user_id, text, created_at: new Date().toISOString(), user_profiles: { id: user_id, full_name: 'Test User', avatar_url: null } }, timestamp: new Date() }); res.json({ success: true, message: 'Test message sent to polling clients' }); } catch (error) { console.error('❌ [Polling Server] Ошибка отправки тестового сообщения:', error); res.status(500).json({ success: false, error: 'Failed to send test message', details: error.message }); } }); return { router, chatHandler }; } module.exports = { ChatPollingHandler, createChatPollingRouter };