1034 lines
39 KiB
JavaScript
1034 lines
39 KiB
JavaScript
const { getSupabaseClient, initializationPromise } = require('./supabaseClient');
|
||
const MODERATION_CONFIG = require('./chat-ai-agent/moderation-config');
|
||
const { moderationText } = require('./chat-ai-agent/chat-moderation');
|
||
const { getIo } = require('../../../io');
|
||
|
||
class ChatPollingHandler {
|
||
constructor() {
|
||
this.connectedClients = new Map(); // user_id -> { user_info, chats: Set(), lastActivity: Date }
|
||
this.chatParticipants = new Map(); // chat_id -> Set(user_id)
|
||
this.userEventQueues = new Map(); // user_id -> [{id, event, data, timestamp}]
|
||
this.eventIdCounter = 0;
|
||
this.realtimeSubscription = null;
|
||
|
||
// Инициализируем Supabase подписку с задержкой и проверками
|
||
this.initializeWithRetry();
|
||
|
||
// Очистка старых событий каждые 5 минут
|
||
setInterval(() => {
|
||
this.cleanupOldEvents();
|
||
}, 5 * 60 * 1000);
|
||
}
|
||
|
||
// Инициализация с повторными попытками
|
||
async initializeWithRetry() {
|
||
try {
|
||
// Сначала ждем завершения основной инициализации
|
||
await initializationPromise;
|
||
|
||
this.setupRealtimeSubscription();
|
||
this.testRealtimeConnection();
|
||
return;
|
||
|
||
} catch (error) {
|
||
console.log('❌ [Supabase] Основная инициализация неудачна, пробуем альтернативный подход');
|
||
}
|
||
|
||
// Если основная инициализация не удалась, используем повторные попытки
|
||
let attempts = 0;
|
||
const maxAttempts = 10;
|
||
const baseDelay = 2000; // 2 секунды
|
||
|
||
while (attempts < maxAttempts) {
|
||
try {
|
||
attempts++;
|
||
|
||
// Ждем перед попыткой
|
||
await new Promise(resolve => setTimeout(resolve, baseDelay * attempts));
|
||
|
||
// Проверяем готовность Supabase клиента
|
||
const supabase = getSupabaseClient();
|
||
if (supabase) {
|
||
this.setupRealtimeSubscription();
|
||
this.testRealtimeConnection();
|
||
return; // Успех, выходим
|
||
}
|
||
} catch (error) {
|
||
console.log(`❌ [Supabase] Попытка #${attempts} неудачна:`, error.message);
|
||
|
||
if (attempts === maxAttempts) {
|
||
console.error('❌ [Supabase] Все попытки инициализации исчерпаны');
|
||
console.error('❌ [Supabase] Realtime подписка будет недоступна');
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Аутентификация пользователя
|
||
async handleAuthentication(req, res) {
|
||
const { user_id, token } = req.body;
|
||
|
||
if (!user_id) {
|
||
res.status(400).json({ error: 'user_id is required' });
|
||
return;
|
||
}
|
||
|
||
try {
|
||
// Проверяем пользователя в базе данных
|
||
const supabase = getSupabaseClient();
|
||
const { data: userProfile, error } = await supabase
|
||
.from('user_profiles')
|
||
.select('*')
|
||
.eq('id', user_id)
|
||
.single();
|
||
|
||
if (error) {
|
||
console.log('❌ [Polling Server] Пользователь не найден:', error);
|
||
res.status(401).json({ error: 'User not found' });
|
||
return;
|
||
}
|
||
|
||
// Регистрируем пользователя
|
||
this.connectedClients.set(user_id, {
|
||
user_info: {
|
||
user_id,
|
||
profile: userProfile,
|
||
last_seen: new Date()
|
||
},
|
||
chats: new Set(),
|
||
lastActivity: new Date()
|
||
});
|
||
|
||
// Создаем очередь событий для пользователя
|
||
if (!this.userEventQueues.has(user_id)) {
|
||
this.userEventQueues.set(user_id, []);
|
||
}
|
||
|
||
// Добавляем событие аутентификации в очередь
|
||
this.addEventToQueue(user_id, 'authenticated', {
|
||
message: 'Successfully authenticated',
|
||
user: userProfile
|
||
});
|
||
|
||
res.json({
|
||
success: true,
|
||
message: 'Successfully authenticated',
|
||
user: userProfile
|
||
});
|
||
|
||
} catch (error) {
|
||
console.error('❌ [Polling Server] Ошибка аутентификации:', error);
|
||
res.status(500).json({ error: 'Authentication failed' });
|
||
}
|
||
}
|
||
|
||
// Эндпоинт для получения событий (polling)
|
||
async handleGetEvents(req, res) {
|
||
try {
|
||
const { user_id, last_event_id } = req.query;
|
||
|
||
if (!user_id) {
|
||
res.status(400).json({ error: 'user_id is required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
// Обновляем время последней активности
|
||
client.lastActivity = new Date();
|
||
|
||
// Получаем очередь событий пользователя
|
||
const eventQueue = this.userEventQueues.get(user_id) || [];
|
||
|
||
// Фильтруем события после last_event_id
|
||
const lastEventId = parseInt(last_event_id) || 0;
|
||
const newEvents = eventQueue.filter(event => event.id > lastEventId);
|
||
|
||
// Логируем отправку событий клиенту
|
||
if (newEvents.length > 0) {
|
||
console.log(`📨 [Polling Server] Отправляем ${newEvents.length} событий клиенту ${user_id}`);
|
||
newEvents.forEach(event => {
|
||
if (event.event === 'message_updated') {
|
||
console.log(`📨 [Polling Server] → Событие: ${event.event}, Сообщение ID: ${event.data?.message?.id}, Текст: "${event.data?.message?.text?.substring(0, 50)}${(event.data?.message?.text?.length || 0) > 50 ? '...' : ''}"`);
|
||
}
|
||
});
|
||
}
|
||
|
||
res.json({
|
||
success: true,
|
||
events: newEvents,
|
||
last_event_id: eventQueue.length > 0 ? Math.max(...eventQueue.map(e => e.id)) : lastEventId
|
||
});
|
||
|
||
} catch (error) {
|
||
console.error('❌ [Polling Server] Ошибка получения событий:', error);
|
||
res.status(500).json({ error: 'Failed to get events' });
|
||
}
|
||
}
|
||
|
||
// HTTP эндпоинт для присоединения к чату
|
||
async handleJoinChat(req, res) {
|
||
try {
|
||
const { user_id, chat_id } = req.body;
|
||
|
||
if (!user_id || !chat_id) {
|
||
res.status(400).json({ error: 'user_id and chat_id are required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
// Проверяем, что чат существует и пользователь имеет доступ к нему
|
||
const supabase = getSupabaseClient();
|
||
const { data: chat, error } = await supabase
|
||
.from('chats')
|
||
.select(`
|
||
*,
|
||
buildings (
|
||
management_company_id,
|
||
apartments (
|
||
apartment_residents (
|
||
user_id
|
||
)
|
||
)
|
||
)
|
||
`)
|
||
.eq('id', chat_id)
|
||
.single();
|
||
|
||
if (error || !chat) {
|
||
res.status(404).json({ error: 'Chat not found' });
|
||
return;
|
||
}
|
||
|
||
// Проверяем доступ пользователя к чату через квартиры в доме
|
||
const hasAccess = chat.buildings.apartments.some(apartment =>
|
||
apartment.apartment_residents.some(resident =>
|
||
resident.user_id === user_id
|
||
)
|
||
);
|
||
|
||
if (!hasAccess) {
|
||
res.status(403).json({ error: 'Access denied to this chat' });
|
||
return;
|
||
}
|
||
|
||
// Добавляем пользователя в чат
|
||
client.chats.add(chat_id);
|
||
|
||
if (!this.chatParticipants.has(chat_id)) {
|
||
this.chatParticipants.set(chat_id, new Set());
|
||
}
|
||
this.chatParticipants.get(chat_id).add(user_id);
|
||
|
||
// Добавляем событие присоединения в очередь пользователя
|
||
this.addEventToQueue(user_id, 'joined_chat', {
|
||
chat_id,
|
||
chat: chat,
|
||
message: 'Successfully joined chat'
|
||
});
|
||
|
||
// Уведомляем других участников о подключении
|
||
this.broadcastToChatExcludeUser(chat_id, user_id, 'user_joined', {
|
||
chat_id,
|
||
user: client.user_info.profile,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({ success: true, message: 'Joined chat successfully' });
|
||
|
||
} catch (error) {
|
||
res.status(500).json({ error: 'Failed to join chat' });
|
||
}
|
||
}
|
||
|
||
// HTTP эндпоинт для покидания чата
|
||
async handleLeaveChat(req, res) {
|
||
try {
|
||
const { user_id, chat_id } = req.body;
|
||
|
||
if (!user_id || !chat_id) {
|
||
res.status(400).json({ error: 'user_id and chat_id are required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
// Удаляем пользователя из чата
|
||
client.chats.delete(chat_id);
|
||
|
||
if (this.chatParticipants.has(chat_id)) {
|
||
this.chatParticipants.get(chat_id).delete(user_id);
|
||
|
||
// Если чат пуст, удаляем его
|
||
if (this.chatParticipants.get(chat_id).size === 0) {
|
||
this.chatParticipants.delete(chat_id);
|
||
}
|
||
}
|
||
|
||
// Уведомляем других участников об отключении
|
||
this.broadcastToChatExcludeUser(chat_id, user_id, 'user_left', {
|
||
chat_id,
|
||
user: client.user_info.profile,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({ success: true, message: 'Left chat successfully' });
|
||
|
||
} catch (error) {
|
||
res.status(500).json({ error: 'Failed to leave chat' });
|
||
}
|
||
}
|
||
|
||
// HTTP эндпоинт для отправки сообщения
|
||
async handleSendMessage(req, res) {
|
||
try {
|
||
const { user_id, chat_id, text } = req.body;
|
||
|
||
if (!user_id || !chat_id || !text) {
|
||
res.status(400).json({ error: 'user_id, chat_id and text are required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
if (!client.chats.has(chat_id)) {
|
||
res.status(403).json({ error: 'Not joined to this chat' });
|
||
return;
|
||
}
|
||
|
||
// Сохраняем сообщение в базу данных
|
||
const supabase = getSupabaseClient();
|
||
const { data: message, error } = await supabase
|
||
.from('messages')
|
||
.insert({
|
||
chat_id,
|
||
user_id,
|
||
text
|
||
})
|
||
.select(`
|
||
*,
|
||
user_profiles (
|
||
id,
|
||
full_name,
|
||
avatar_url
|
||
)
|
||
`)
|
||
.single();
|
||
|
||
if (error) {
|
||
res.status(500).json({ error: 'Failed to save message' });
|
||
return;
|
||
}
|
||
|
||
// Отправляем сообщение всем участникам чата
|
||
this.broadcastToChat(chat_id, 'new_message', {
|
||
message,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({ success: true, message: 'Message sent successfully' });
|
||
|
||
} catch (error) {
|
||
res.status(500).json({ error: 'Failed to send message' });
|
||
}
|
||
}
|
||
|
||
// HTTP эндпоинт для индикации печатания
|
||
async handleTypingStart(req, res) {
|
||
try {
|
||
const { user_id, chat_id } = req.body;
|
||
|
||
if (!user_id || !chat_id) {
|
||
res.status(400).json({ error: 'user_id and chat_id are required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
if (!client.chats.has(chat_id)) {
|
||
res.status(403).json({ error: 'Not joined to this chat' });
|
||
return;
|
||
}
|
||
|
||
this.broadcastToChatExcludeUser(chat_id, user_id, 'user_typing_start', {
|
||
chat_id,
|
||
user: client.user_info.profile,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({ success: true });
|
||
|
||
} catch (error) {
|
||
res.status(500).json({ error: 'Failed to send typing indicator' });
|
||
}
|
||
}
|
||
|
||
// HTTP эндпоинт для остановки индикации печатания
|
||
async handleTypingStop(req, res) {
|
||
try {
|
||
const { user_id, chat_id } = req.body;
|
||
|
||
if (!user_id || !chat_id) {
|
||
res.status(400).json({ error: 'user_id and chat_id are required' });
|
||
return;
|
||
}
|
||
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) {
|
||
res.status(401).json({ error: 'Not authenticated' });
|
||
return;
|
||
}
|
||
|
||
if (!client.chats.has(chat_id)) {
|
||
res.status(403).json({ error: 'Not joined to this chat' });
|
||
return;
|
||
}
|
||
|
||
this.broadcastToChatExcludeUser(chat_id, user_id, 'user_typing_stop', {
|
||
chat_id,
|
||
user: client.user_info.profile,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({ success: true });
|
||
|
||
} catch (error) {
|
||
res.status(500).json({ error: 'Failed to send typing indicator' });
|
||
}
|
||
}
|
||
|
||
// Обработка отключения клиента
|
||
handleClientDisconnect(user_id) {
|
||
const client = this.connectedClients.get(user_id);
|
||
if (!client) return;
|
||
|
||
// Удаляем пользователя из всех чатов
|
||
client.chats.forEach(chat_id => {
|
||
if (this.chatParticipants.has(chat_id)) {
|
||
this.chatParticipants.get(chat_id).delete(user_id);
|
||
|
||
// Уведомляем других участников об отключении
|
||
this.broadcastToChatExcludeUser(chat_id, user_id, 'user_left', {
|
||
chat_id,
|
||
user: client.user_info.profile,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
// Если чат пуст, удаляем его
|
||
if (this.chatParticipants.get(chat_id).size === 0) {
|
||
this.chatParticipants.delete(chat_id);
|
||
}
|
||
}
|
||
});
|
||
|
||
// Удаляем клиента
|
||
this.connectedClients.delete(user_id);
|
||
}
|
||
|
||
// Добавление события в очередь пользователя
|
||
addEventToQueue(user_id, event, data) {
|
||
if (!this.userEventQueues.has(user_id)) {
|
||
this.userEventQueues.set(user_id, []);
|
||
}
|
||
|
||
const eventQueue = this.userEventQueues.get(user_id);
|
||
const eventId = ++this.eventIdCounter;
|
||
|
||
eventQueue.push({
|
||
id: eventId,
|
||
event,
|
||
data,
|
||
timestamp: new Date()
|
||
});
|
||
|
||
// Ограничиваем размер очереди (последние 100 событий)
|
||
if (eventQueue.length > 100) {
|
||
eventQueue.splice(0, eventQueue.length - 100);
|
||
}
|
||
}
|
||
|
||
// Рассылка события всем участникам чата
|
||
broadcastToChat(chat_id, event, data) {
|
||
const participants = this.chatParticipants.get(chat_id);
|
||
if (!participants) return;
|
||
|
||
participants.forEach(user_id => {
|
||
this.addEventToQueue(user_id, event, data);
|
||
});
|
||
}
|
||
|
||
// Рассылка события всем участникам чата кроме отправителя
|
||
broadcastToChatExcludeUser(chat_id, exclude_user_id, event, data) {
|
||
const participants = this.chatParticipants.get(chat_id);
|
||
if (!participants) return;
|
||
|
||
participants.forEach(user_id => {
|
||
if (user_id !== exclude_user_id) {
|
||
this.addEventToQueue(user_id, event, data);
|
||
}
|
||
});
|
||
}
|
||
|
||
// Получение списка онлайн пользователей в чате
|
||
getOnlineUsersInChat(chat_id) {
|
||
const participants = this.chatParticipants.get(chat_id) || new Set();
|
||
const onlineUsers = [];
|
||
const now = new Date();
|
||
const ONLINE_THRESHOLD = 2 * 60 * 1000; // 2 минуты
|
||
|
||
participants.forEach(user_id => {
|
||
const client = this.connectedClients.get(user_id);
|
||
if (client && (now - client.lastActivity) < ONLINE_THRESHOLD) {
|
||
onlineUsers.push(client.user_info.profile);
|
||
}
|
||
});
|
||
|
||
return onlineUsers;
|
||
}
|
||
|
||
// Отправка системного сообщения в чат
|
||
async sendSystemMessage(chat_id, text) {
|
||
this.broadcastToChat(chat_id, 'system_message', {
|
||
chat_id,
|
||
text,
|
||
timestamp: new Date()
|
||
});
|
||
}
|
||
|
||
// Очистка старых событий
|
||
cleanupOldEvents() {
|
||
const now = new Date();
|
||
const MAX_EVENT_AGE = 24 * 60 * 60 * 1000; // 24 часа
|
||
const INACTIVE_USER_THRESHOLD = 60 * 60 * 1000; // 1 час
|
||
|
||
// Очищаем старые события
|
||
this.userEventQueues.forEach((eventQueue, user_id) => {
|
||
const filteredEvents = eventQueue.filter(event =>
|
||
(now - event.timestamp) < MAX_EVENT_AGE
|
||
);
|
||
|
||
if (filteredEvents.length !== eventQueue.length) {
|
||
this.userEventQueues.set(user_id, filteredEvents);
|
||
}
|
||
});
|
||
|
||
// Удаляем неактивных пользователей
|
||
this.connectedClients.forEach((client, user_id) => {
|
||
if ((now - client.lastActivity) > INACTIVE_USER_THRESHOLD) {
|
||
this.handleClientDisconnect(user_id);
|
||
this.userEventQueues.delete(user_id);
|
||
}
|
||
});
|
||
}
|
||
|
||
// Тестирование Real-time подписки
|
||
async testRealtimeConnection() {
|
||
try {
|
||
const supabase = getSupabaseClient();
|
||
if (!supabase) {
|
||
return false;
|
||
}
|
||
|
||
// Создаем тестовый канал для проверки подключения
|
||
const testChannel = supabase
|
||
.channel('test_connection')
|
||
.subscribe((status, error) => {
|
||
if (error) {
|
||
console.error('❌ [Supabase] Тестовый канал - ошибка:', error);
|
||
}
|
||
|
||
if (status === 'SUBSCRIBED') {
|
||
// Отписываемся от тестового канала
|
||
setTimeout(() => {
|
||
testChannel.unsubscribe();
|
||
}, 2000);
|
||
}
|
||
});
|
||
|
||
return true;
|
||
} catch (error) {
|
||
console.error('❌ [Supabase] Ошибка тестирования Realtime:', error);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// Проверка статуса подписки
|
||
checkSubscriptionStatus() {
|
||
if (this.realtimeSubscription) {
|
||
return true;
|
||
} else {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
setupRealtimeSubscription() {
|
||
// Убираем setTimeout, вызываем сразу
|
||
this._doSetupRealtimeSubscription();
|
||
}
|
||
|
||
_doSetupRealtimeSubscription() {
|
||
try {
|
||
const supabase = getSupabaseClient();
|
||
|
||
if (!supabase) {
|
||
console.log('❌ [Supabase] Supabase клиент не найден');
|
||
throw new Error('Supabase client not available');
|
||
}
|
||
|
||
// Подписываемся на изменения в таблице messages
|
||
const subscription = supabase
|
||
.channel('messages_changes')
|
||
.on(
|
||
'postgres_changes',
|
||
{
|
||
event: 'INSERT',
|
||
schema: 'public',
|
||
table: 'messages'
|
||
},
|
||
async (payload) => {
|
||
try {
|
||
const newMessage = payload.new;
|
||
if (!newMessage) {
|
||
return;
|
||
}
|
||
|
||
if (!newMessage.chat_id) {
|
||
return;
|
||
}
|
||
|
||
console.log(`📡 [Supabase Real-time] === ПОЛУЧЕНО НОВОЕ СООБЩЕНИЕ ===`);
|
||
console.log(`📡 [Supabase Real-time] ID сообщения: ${newMessage.id}`);
|
||
console.log(`📡 [Supabase Real-time] Чат ID: ${newMessage.chat_id}`);
|
||
console.log(`📡 [Supabase Real-time] Пользователь ID: ${newMessage.user_id}`);
|
||
console.log(`📡 [Supabase Real-time] Текст: "${newMessage.text?.substring(0, 100)}${(newMessage.text?.length || 0) > 100 ? '...' : ''}"`);
|
||
console.log(`📡 [Supabase Real-time] Время: ${new Date().toISOString()}`);
|
||
|
||
// Получаем профиль пользователя
|
||
const { data: userProfile, error: profileError } = await supabase
|
||
.from('user_profiles')
|
||
.select('id, full_name, avatar_url')
|
||
.eq('id', newMessage.user_id)
|
||
.single();
|
||
|
||
if (profileError) {
|
||
console.error('❌ [Supabase] Ошибка получения профиля пользователя:', profileError);
|
||
} else {
|
||
console.log(`✅ [Supabase Real-time] Профиль пользователя получен: ${userProfile.full_name || 'No name'}`);
|
||
}
|
||
|
||
// Объединяем сообщение с профилем
|
||
const messageWithProfile = {
|
||
...newMessage,
|
||
user_profiles: userProfile || null
|
||
};
|
||
|
||
// Отправляем сообщение всем участникам чата
|
||
console.log(`📤 [Supabase Real-time] Отправляем сообщение участникам чата ${newMessage.chat_id}...`);
|
||
this.broadcastToChat(newMessage.chat_id, 'new_message', {
|
||
message: messageWithProfile,
|
||
timestamp: new Date()
|
||
});
|
||
console.log(`✅ [Supabase Real-time] Сообщение отправлено участникам чата`);
|
||
|
||
// === ЗАПУСК МОДЕРАЦИИ ===
|
||
if (MODERATION_CONFIG.MODERATION_ENABLED) {
|
||
console.log(`🛡️ [Supabase Real-time] Модерация включена - планируем проверку сообщения`);
|
||
console.log(`🛡️ [Supabase Real-time] Задержка модерации: ${MODERATION_CONFIG.MODERATION_DELAY}мс`);
|
||
|
||
if (MODERATION_CONFIG.MODERATION_DELAY === 0) {
|
||
console.log(`⚡ [Supabase Real-time] Мгновенная модерация - запускаем setImmediate`);
|
||
setImmediate(() => {
|
||
console.log(`⚡ [Supabase Real-time] setImmediate: запускаем модерацию сообщения ${newMessage.id}`);
|
||
this.moderateMessage(newMessage.id, newMessage.text, newMessage.chat_id);
|
||
});
|
||
} else {
|
||
console.log(`⏰ [Supabase Real-time] Отложенная модерация - устанавливаем setTimeout`);
|
||
const timeoutId = setTimeout(() => {
|
||
console.log(`⏰ [Supabase Real-time] setTimeout: время пришло, запускаем модерацию сообщения ${newMessage.id}`);
|
||
console.log(`⏰ [Supabase Real-time] Фактическое время: ${new Date().toISOString()}`);
|
||
this.moderateMessage(newMessage.id, newMessage.text, newMessage.chat_id);
|
||
}, MODERATION_CONFIG.MODERATION_DELAY);
|
||
|
||
console.log(`⏰ [Supabase Real-time] Timeout ID: ${timeoutId}`);
|
||
console.log(`⏰ [Supabase Real-time] Ожидаемое время срабатывания: ${new Date(Date.now() + MODERATION_CONFIG.MODERATION_DELAY).toISOString()}`);
|
||
}
|
||
|
||
console.log(`🛡️ [Supabase Real-time] Модерация запланирована для сообщения ${newMessage.id}`);
|
||
} else {
|
||
console.log(`🔓 [Supabase Real-time] Модерация отключена - сообщение не будет проверяться`);
|
||
}
|
||
|
||
} catch (callbackError) {
|
||
console.error('❌ [Supabase] Ошибка в обработчике сообщения:', callbackError);
|
||
console.error('❌ [Supabase] Stack trace:', callbackError.stack);
|
||
}
|
||
}
|
||
)
|
||
.on(
|
||
'postgres_changes',
|
||
{
|
||
event: 'UPDATE',
|
||
schema: 'public',
|
||
table: 'messages'
|
||
},
|
||
async (payload) => {
|
||
try {
|
||
const updatedMessage = payload.new;
|
||
if (!updatedMessage) {
|
||
return;
|
||
}
|
||
|
||
if (!updatedMessage.chat_id) {
|
||
return;
|
||
}
|
||
|
||
console.log(`🔄 [Supabase Real-time] === ПОЛУЧЕНО ОБНОВЛЕНИЕ СООБЩЕНИЯ ===`);
|
||
console.log(`🔄 [Supabase Real-time] ID сообщения: ${updatedMessage.id}`);
|
||
console.log(`🔄 [Supabase Real-time] Чат ID: ${updatedMessage.chat_id}`);
|
||
console.log(`🔄 [Supabase Real-time] Пользователь ID: ${updatedMessage.user_id}`);
|
||
console.log(`🔄 [Supabase Real-time] Обновленный текст: "${updatedMessage.text?.substring(0, 100)}${(updatedMessage.text?.length || 0) > 100 ? '...' : ''}"`);
|
||
console.log(`🔄 [Supabase Real-time] Время обновления: ${new Date().toISOString()}`);
|
||
|
||
// Получаем профиль пользователя
|
||
const { data: userProfile, error: profileError } = await supabase
|
||
.from('user_profiles')
|
||
.select('id, full_name, avatar_url')
|
||
.eq('id', updatedMessage.user_id)
|
||
.single();
|
||
|
||
if (profileError) {
|
||
console.error('❌ [Supabase] Ошибка получения профиля пользователя:', profileError);
|
||
}
|
||
|
||
// Объединяем сообщение с профилем
|
||
const messageWithProfile = {
|
||
...updatedMessage,
|
||
user_profiles: userProfile || null
|
||
};
|
||
|
||
// Отправляем обновление всем участникам чата
|
||
console.log(`📤 [Supabase Real-time] Отправляем обновление участникам чата ${updatedMessage.chat_id}...`);
|
||
this.broadcastToChat(updatedMessage.chat_id, 'message_updated', {
|
||
message: messageWithProfile,
|
||
timestamp: new Date()
|
||
});
|
||
console.log(`✅ [Supabase Real-time] Обновление отправлено участникам чата`);
|
||
console.log(`📊 [Supabase Real-time] Событие: message_updated`);
|
||
|
||
} catch (callbackError) {
|
||
console.error('❌ [Supabase] Ошибка в обработчике обновления сообщения:', callbackError);
|
||
}
|
||
}
|
||
)
|
||
.subscribe((status, error) => {
|
||
if (error) {
|
||
console.error('❌ [Supabase] Ошибка подписки:', error);
|
||
}
|
||
|
||
if (status === 'CHANNEL_ERROR') {
|
||
console.error('❌ [Supabase] Ошибка канала');
|
||
} else if (status === 'TIMED_OUT') {
|
||
console.error('❌ [Supabase] Таймаут подписки');
|
||
}
|
||
});
|
||
|
||
// Сохраняем ссылку на подписку для возможности отписки
|
||
this.realtimeSubscription = subscription;
|
||
|
||
} catch (error) {
|
||
console.error('❌ [Supabase] Критическая ошибка при настройке подписки:', error);
|
||
throw error; // Пробрасываем ошибку для обработки в initializeWithRetry
|
||
}
|
||
}
|
||
|
||
// Функция отложенной модерации сообщения
|
||
async moderateMessage(messageId, messageText, chatId) {
|
||
const moderationStartTime = Date.now();
|
||
|
||
try {
|
||
console.log(`🔍 [Moderation] === НАЧАЛО МОДЕРАЦИИ СООБЩЕНИЯ ${messageId} ===`);
|
||
console.log(`🔍 [Moderation] Chat ID: ${chatId}`);
|
||
console.log(`🔍 [Moderation] Длина текста: ${messageText.length} символов`);
|
||
console.log(`🔍 [Moderation] Превью текста: "${messageText.length > 100 ? messageText.substring(0, 100) + '...' : messageText}"`);
|
||
console.log(`🔍 [Moderation] Время запуска: ${new Date().toISOString()}`);
|
||
|
||
// Вызываем функцию модерации
|
||
console.log(`🔍 [Moderation] Передаем сообщение AI агенту для анализа...`);
|
||
console.log(`🔍 [Moderation] Функция moderationText доступна: ${typeof moderationText}`);
|
||
console.log(`🔍 [Moderation] Тип сообщения: ${typeof messageText}`);
|
||
console.log(`🔍 [Moderation] Текст сообщения: "${messageText}"`);
|
||
|
||
let comment, isApproved, finalMessage;
|
||
try {
|
||
const result = await moderationText('', messageText);
|
||
console.log(`🔍 [Moderation] Результат от AI агента получен:`, result);
|
||
[comment, isApproved, finalMessage] = result;
|
||
console.log(`🔍 [Moderation] Распакованные значения: comment="${comment}", isApproved=${isApproved}, finalMessage="${finalMessage}"`);
|
||
} catch (moderationError) {
|
||
console.error(`❌ [Moderation] Ошибка при вызове AI агента:`, moderationError);
|
||
console.error(`❌ [Moderation] Stack trace:`, moderationError.stack);
|
||
// В случае ошибки одобряем сообщение
|
||
comment = '';
|
||
isApproved = true;
|
||
finalMessage = messageText;
|
||
console.log(`⚠️ [Moderation] Используем fallback значения из-за ошибки`);
|
||
}
|
||
|
||
const moderationTime = Date.now() - moderationStartTime;
|
||
console.log(`📝 [Moderation] === РЕЗУЛЬТАТ МОДЕРАЦИИ СООБЩЕНИЯ ${messageId} ===`);
|
||
console.log(`📝 [Moderation] Время модерации: ${moderationTime}мс`);
|
||
console.log(`📝 [Moderation] Решение: ${isApproved ? '✅ ОДОБРЕНО' : '❌ ОТКЛОНЕНО'}`);
|
||
console.log(`📝 [Moderation] Комментарий: "${comment || 'отсутствует'}"`);
|
||
console.log(`📝 [Moderation] Финальный текст: "${finalMessage}"`);
|
||
|
||
if (isApproved) {
|
||
console.log(`📝 [Moderation] Действие: сообщение остается без изменений`);
|
||
} else {
|
||
console.log(`📝 [Moderation] Действие: сообщение будет заменено в базе данных`);
|
||
}
|
||
|
||
// Если сообщение не прошло модерацию, обновляем его в базе данных
|
||
if (!isApproved) {
|
||
console.log(`💾 [Moderation] Начинаем обновление сообщения в базе данных...`);
|
||
|
||
const supabase = getSupabaseClient();
|
||
|
||
// Сначала получаем информацию о сообщении для получения chat_id
|
||
console.log(`💾 [Moderation] Получаем данные сообщения из базы...`);
|
||
const { data: messageData, error: fetchError } = await supabase
|
||
.from('messages')
|
||
.select('chat_id, user_id')
|
||
.eq('id', messageId)
|
||
.single();
|
||
|
||
if (fetchError) {
|
||
console.error(`❌ [Moderation] Ошибка получения данных сообщения ${messageId}:`, fetchError);
|
||
return;
|
||
}
|
||
|
||
console.log(`💾 [Moderation] Данные получены. Chat ID: ${messageData.chat_id}, User ID: ${messageData.user_id}`);
|
||
|
||
// Обновляем текст сообщения
|
||
console.log(`💾 [Moderation] Обновляем текст сообщения на: "${MODERATION_CONFIG.BLOCKED_MESSAGE_TEXT}"`);
|
||
const { data: updatedMessage, error } = await supabase
|
||
.from('messages')
|
||
.update({ text: MODERATION_CONFIG.BLOCKED_MESSAGE_TEXT })
|
||
.eq('id', messageId)
|
||
.select('*')
|
||
.single();
|
||
|
||
if (error) {
|
||
console.error(`❌ [Moderation] Ошибка обновления сообщения ${messageId}:`, error);
|
||
console.error(`❌ [Moderation] Детали ошибки:`, error);
|
||
} else {
|
||
console.log(`✅ [Moderation] Сообщение ${messageId} успешно обновлено в базе данных`);
|
||
console.log(`✅ [Moderation] Старый текст заменен на: "${updatedMessage.text}"`);
|
||
console.log(`✅ [Moderation] Время обновления: ${updatedMessage.updated_at || 'не указано'}`);
|
||
}
|
||
} else {
|
||
console.log(`✅ [Moderation] Сообщение ${messageId} прошло модерацию - никаких действий не требуется`);
|
||
}
|
||
|
||
const totalTime = Date.now() - moderationStartTime;
|
||
console.log(`🔍 [Moderation] === МОДЕРАЦИЯ СООБЩЕНИЯ ${messageId} ЗАВЕРШЕНА ===`);
|
||
console.log(`🔍 [Moderation] Общее время процесса: ${totalTime}мс`);
|
||
console.log(`🔍 [Moderation] Время завершения: ${new Date().toISOString()}`);
|
||
|
||
} catch (error) {
|
||
const totalTime = Date.now() - moderationStartTime;
|
||
console.error(`❌ [Moderation] === ОШИБКА МОДЕРАЦИИ СООБЩЕНИЯ ${messageId} ===`);
|
||
console.error(`❌ [Moderation] Время до ошибки: ${totalTime}мс`);
|
||
console.error(`❌ [Moderation] Тип ошибки: ${error.name || 'Unknown'}`);
|
||
console.error(`❌ [Moderation] Сообщение ошибки: ${error.message || 'Unknown error'}`);
|
||
console.error(`❌ [Moderation] Stack trace:`, error.stack);
|
||
}
|
||
}
|
||
|
||
// Получение статистики подключений
|
||
getConnectionStats() {
|
||
return {
|
||
connectedClients: this.connectedClients.size,
|
||
activeChats: this.chatParticipants.size,
|
||
totalChatParticipants: Array.from(this.chatParticipants.values())
|
||
.reduce((total, participants) => total + participants.size, 0),
|
||
totalEventQueues: this.userEventQueues.size,
|
||
totalEvents: Array.from(this.userEventQueues.values())
|
||
.reduce((total, queue) => total + queue.length, 0)
|
||
};
|
||
}
|
||
}
|
||
|
||
// Функция для создания роутера с polling эндпоинтами
|
||
function createChatPollingRouter(express) {
|
||
const router = express.Router();
|
||
const chatHandler = new ChatPollingHandler();
|
||
|
||
// CORS middleware для всех запросов
|
||
router.use((req, res, next) => {
|
||
res.header('Access-Control-Allow-Origin', '*');
|
||
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
|
||
res.header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Authorization');
|
||
res.header('Access-Control-Allow-Credentials', 'true');
|
||
|
||
// Обрабатываем OPTIONS запросы
|
||
if (req.method === 'OPTIONS') {
|
||
res.status(200).end();
|
||
return;
|
||
}
|
||
|
||
next();
|
||
});
|
||
|
||
// Эндпоинт для аутентификации
|
||
router.post('/auth', (req, res) => {
|
||
chatHandler.handleAuthentication(req, res);
|
||
});
|
||
|
||
// Эндпоинт для получения событий (polling)
|
||
router.get('/events', (req, res) => {
|
||
chatHandler.handleGetEvents(req, res);
|
||
});
|
||
|
||
// HTTP эндпоинты для действий
|
||
router.post('/join-chat', (req, res) => {
|
||
chatHandler.handleJoinChat(req, res);
|
||
});
|
||
|
||
router.post('/leave-chat', (req, res) => {
|
||
chatHandler.handleLeaveChat(req, res);
|
||
});
|
||
|
||
router.post('/send-message', (req, res) => {
|
||
chatHandler.handleSendMessage(req, res);
|
||
});
|
||
|
||
router.post('/typing-start', (req, res) => {
|
||
chatHandler.handleTypingStart(req, res);
|
||
});
|
||
|
||
router.post('/typing-stop', (req, res) => {
|
||
chatHandler.handleTypingStop(req, res);
|
||
});
|
||
|
||
// Эндпоинт для получения онлайн пользователей в чате
|
||
router.get('/online-users/:chat_id', (req, res) => {
|
||
const { chat_id } = req.params;
|
||
const onlineUsers = chatHandler.getOnlineUsersInChat(chat_id);
|
||
res.json({ onlineUsers });
|
||
});
|
||
|
||
// Эндпоинт для получения статистики
|
||
router.get('/stats', (req, res) => {
|
||
const stats = chatHandler.getConnectionStats();
|
||
res.json(stats);
|
||
});
|
||
|
||
// Эндпоинт для проверки статуса Supabase подписки
|
||
router.get('/supabase-status', (req, res) => {
|
||
const isConnected = chatHandler.checkSubscriptionStatus();
|
||
res.json({
|
||
supabaseSubscriptionActive: isConnected,
|
||
subscriptionExists: !!chatHandler.realtimeSubscription,
|
||
subscriptionInfo: chatHandler.realtimeSubscription ? {
|
||
channel: chatHandler.realtimeSubscription.topic,
|
||
state: chatHandler.realtimeSubscription.state
|
||
} : null
|
||
});
|
||
});
|
||
|
||
// Эндпоинт для принудительного переподключения к Supabase
|
||
router.post('/reconnect-supabase', (req, res) => {
|
||
try {
|
||
// Отписываемся от текущей подписки
|
||
if (chatHandler.realtimeSubscription) {
|
||
chatHandler.realtimeSubscription.unsubscribe();
|
||
chatHandler.realtimeSubscription = null;
|
||
}
|
||
|
||
// Создаем новую подписку
|
||
chatHandler.setupRealtimeSubscription();
|
||
|
||
res.json({
|
||
success: true,
|
||
message: 'Reconnection initiated'
|
||
});
|
||
} catch (error) {
|
||
console.error('❌ [Polling Server] Ошибка переподключения:', error);
|
||
res.status(500).json({
|
||
success: false,
|
||
error: 'Reconnection failed',
|
||
details: error.message
|
||
});
|
||
}
|
||
});
|
||
|
||
// Тестовый эндпоинт для создания сообщения в обход API
|
||
router.post('/test-message', async (req, res) => {
|
||
const { chat_id, user_id, text } = req.body;
|
||
|
||
if (!chat_id || !user_id || !text) {
|
||
res.status(400).json({ error: 'chat_id, user_id и text обязательны' });
|
||
return;
|
||
}
|
||
|
||
try {
|
||
// Создаем тестовое событие напрямую
|
||
chatHandler.broadcastToChat(chat_id, 'new_message', {
|
||
message: {
|
||
id: `test_${Date.now()}`,
|
||
chat_id,
|
||
user_id,
|
||
text,
|
||
created_at: new Date().toISOString(),
|
||
user_profiles: {
|
||
id: user_id,
|
||
full_name: 'Test User',
|
||
avatar_url: null
|
||
}
|
||
},
|
||
timestamp: new Date()
|
||
});
|
||
|
||
res.json({
|
||
success: true,
|
||
message: 'Test message sent to polling clients'
|
||
});
|
||
} catch (error) {
|
||
console.error('❌ [Polling Server] Ошибка отправки тестового сообщения:', error);
|
||
res.status(500).json({
|
||
success: false,
|
||
error: 'Failed to send test message',
|
||
details: error.message
|
||
});
|
||
}
|
||
});
|
||
|
||
return { router, chatHandler };
|
||
}
|
||
|
||
module.exports = {
|
||
ChatPollingHandler,
|
||
createChatPollingRouter
|
||
};
|