From caf82a46a831f4f1827d01939b880c2199d12aa0 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Tue, 8 Dec 2020 18:06:20 +0530 Subject: [PATCH] Indexing change + missing messages detection --- src/Tests/Tests.Misc.ts | 39 +++++++++++++++++++++- src/WAConnection/4.Events.ts | 51 +++++++++++++++++++++++------ src/WAConnection/5.User.ts | 2 +- src/WAConnection/7.MessagesExtra.ts | 37 +++++++++++++-------- src/WAConnection/Constants.ts | 2 ++ 5 files changed, 105 insertions(+), 26 deletions(-) diff --git a/src/Tests/Tests.Misc.ts b/src/Tests/Tests.Misc.ts index 355d6cd..eacbc62 100644 --- a/src/Tests/Tests.Misc.ts +++ b/src/Tests/Tests.Misc.ts @@ -1,4 +1,4 @@ -import { Presence, ChatModification, delay } from '../WAConnection/WAConnection' +import { Presence, ChatModification, delay, newMessagesDB } from '../WAConnection/WAConnection' import { promises as fs } from 'fs' import * as assert from 'assert' import fetch from 'node-fetch' @@ -156,4 +156,41 @@ WAConnectionTest('Misc', (conn) => { conn.generateLinkPreview ('I sent links to https://teachyourselfcs.com/ and https://www.fast.ai/') ) }) + // this test requires quite a few messages with the test JID + it('should detect overlaps and clear messages accordingly', async () => { + // wait for chats + await new Promise(resolve => conn.once('chats-received', resolve)) + + conn.maxCachedMessages = 100 + + const chat = conn.chats.get(testJid) + const oldCount = chat.messages.length + console.log(`test chat has ${oldCount} pre-loaded messages`) + // load 100 messages + await conn.loadMessages(testJid, 100, undefined) + assert.strictEqual(chat.messages.length, 100) + + conn.close() + // remove all latest messages + chat.messages = newMessagesDB( chat.messages.all().slice(0, 20) ) + + const task = new Promise((resolve, reject) => ( + conn.on('chats-received', ({ hasReceivedLastMessage, chatsWithMissingMessages }) => { + if (hasReceivedLastMessage) { + assert.strictEqual(Object.keys(chatsWithMissingMessages).length, 1) + const missing = chatsWithMissingMessages.find(({ jid }) => jid === testJid) + assert.ok(missing, 'missing message not detected') + assert.strictEqual( + conn.chats.get(testJid).messages.length, + missing.count + ) + assert.strictEqual(missing.count, oldCount) + resolve() + } + }) + )) + + await conn.connect() + await task + }) }) \ No newline at end of file diff --git a/src/WAConnection/4.Events.ts b/src/WAConnection/4.Events.ts index 458d578..156b1ce 100644 --- a/src/WAConnection/4.Events.ts +++ b/src/WAConnection/4.Events.ts @@ -64,8 +64,15 @@ export class WAConnection extends Base { }) // we store these last messages const lastMessages = {} + // keep track of overlaps, + // if there are no overlaps of messages and we had messages present, we clear the previous messages + // this prevents missing messages in conversations + let overlaps: { [_: string]: { requiresOverlap: boolean, didOverlap?: boolean } } = {} // messages received - const messagesUpdate = (json, style: 'prepend' | 'append') => { + const messagesUpdate = (json, style: 'previous' | 'last') => { + if (style === 'last') { + overlaps = {} + } const isLast = json[1].last const messages = json[2] as WANode[] if (messages) { @@ -74,17 +81,26 @@ export class WAConnection extends Base { const jid = message.key.remoteJid const chat = this.chats.get(jid) const mKeyID = WA_MESSAGE_ID(message) - if (chat && !chat.messages.get(mKeyID)) { - if (style === 'prepend') { + if (chat) { + if (style === 'previous') { const fm = chat.messages.get(lastMessages[jid]) if (!fm) return const prevEpoch = fm['epoch'] message['epoch'] = prevEpoch-1 - } else if (style === 'append') { + } else if (style === 'last') { + // no overlap required, if there were no previous messages + overlaps[jid] = { requiresOverlap: chat.messages.length > 0 } + const lm = chat.messages.all()[chat.messages.length-1] const prevEpoch = (lm && lm['epoch']) || 0 - message['epoch'] = prevEpoch+100 // hacky way to allow more previous messages + // hacky way to allow more previous messages + message['epoch'] = prevEpoch+1000 } + if (chat.messages.get(mKeyID)) { + chat.messages.delete(message) + overlaps[jid] = { ...(overlaps[jid] || { requiresOverlap: true }), didOverlap: true } + } + chat.messages.insert (message) updates[jid] = updates[jid] || newMessagesDB() @@ -100,12 +116,27 @@ export class WAConnection extends Base { } } if (isLast) { - this.emit('chats-received', { hasReceivedLastMessage: true }) + // find which chats had missing messages + // list out all the jids, and how many messages we've cached now + const chatsWithMissingMessages = Object.keys(overlaps).map(jid => { + // if there was no overlap, delete previous messages + if (!overlaps[jid].didOverlap && overlaps[jid].requiresOverlap) { + this.logger.debug(`received messages for ${jid}, but did not overlap with previous messages, clearing...`) + const chat = this.chats.get(jid) + if (chat) { + const message = chat.messages.get(lastMessages[jid]) + const remainingMessages = chat.messages.paginatedByValue(message, this.maxCachedMessages, undefined, 'after') + chat.messages = newMessagesDB([message, ...remainingMessages]) + return { jid, count: chat.messages.length } // return number of messages we've left + } + } + }).filter(Boolean) + this.emit('chats-received', { hasReceivedLastMessage: true, chatsWithMissingMessages }) } } - this.on('CB:action,add:last', json => messagesUpdate(json, 'append')) - this.on('CB:action,add:before', json => messagesUpdate(json, 'prepend')) - this.on('CB:action,add:unread', json => messagesUpdate(json, 'prepend')) + this.on('CB:action,add:last', json => messagesUpdate(json, 'last')) + this.on('CB:action,add:before', json => messagesUpdate(json, 'previous')) + this.on('CB:action,add:unread', json => messagesUpdate(json, 'previous')) // contacts received this.on('CB:response,type:contacts', json => { @@ -575,7 +606,7 @@ export class WAConnection extends Base { /** when contacts are sent by WA */ on (event: 'contacts-received', listener: () => void): this /** when chats are sent by WA, and when all messages are received */ - on (event: 'chats-received', listener: (update: {hasNewChats?: boolean, hasReceivedLastMessage?: boolean}) => void): this + on (event: 'chats-received', listener: (update: {hasNewChats?: boolean, hasReceivedLastMessage?: boolean, chatsWithMissingMessages: { jid: string, count: number }[] }) => void): this /** when multiple chats are updated (new message, updated message, deleted, pinned, etc) */ on (event: 'chats-update', listener: (chats: WAChatUpdate[]) => void): this /** when a chat is updated (new message, updated message, deleted, pinned, presence updated etc) */ diff --git a/src/WAConnection/5.User.ts b/src/WAConnection/5.User.ts index 93b6624..edf6941 100644 --- a/src/WAConnection/5.User.ts +++ b/src/WAConnection/5.User.ts @@ -1,5 +1,5 @@ import {WAConnection as Base} from './4.Events' -import { Presence, WABroadcastListInfo, WAProfilePictureChange, WALoadChatOptions } from './Constants' +import { Presence, WABroadcastListInfo, WAProfilePictureChange, WALoadChatOptions, WAChatIndex } from './Constants' import { WAMessage, WANode, diff --git a/src/WAConnection/7.MessagesExtra.ts b/src/WAConnection/7.MessagesExtra.ts index e261554..993039b 100644 --- a/src/WAConnection/7.MessagesExtra.ts +++ b/src/WAConnection/7.MessagesExtra.ts @@ -1,5 +1,5 @@ import {WAConnection as Base} from './6.MessagesSend' -import { MessageType, WAMessageKey, MessageInfo, WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, ChatModification, BaileysError } from './Constants' +import { MessageType, WAMessageKey, MessageInfo, WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, ChatModification, BaileysError, WAChatIndex } from './Constants' import { whatsappID, delay, toNumber, unixTimestampSeconds, GET_MESSAGE_ID, WA_MESSAGE_ID, isGroupID } from './Utils' import { Mutex } from './Mutex' @@ -19,7 +19,7 @@ export class WAConnection extends Base { @Mutex ((jid, messageID) => jid+messageID) async messageInfo (jid: string, messageID: string) { const query = ['query', {type: 'message_info', index: messageID, jid: jid, epoch: this.msgCount.toString()}, null] - const response = (await this.query ({json: query, binaryTags: [22, WAFlag.ignore], expect200: true}))[2] + const [,,response] = await this.query ({json: query, binaryTags: [WAMetric.queryRead, WAFlag.ignore], expect200: true}) const info: MessageInfo = {reads: [], deliveries: []} if (response) { @@ -44,10 +44,12 @@ export class WAConnection extends Base { jid = whatsappID (jid) const chat = this.assertChatGet (jid) - const message = (await this.loadMessages(jid, 1)).messages[0] - const count = type === 'unread' ? -2 : Math.abs(chat.count) + const count = type === 'unread' ? '-2' : Math.abs(chat.count).toString() if (type === 'unread' || chat.count !== 0) { - await this.sendReadReceipt (jid, message.key, count) + const idx = await this.getChatIndex(jid) + await this.setQuery ([ + ['read', { jid, count, ...idx, participant: undefined }, null] + ], [ WAMetric.read, WAFlag.ignore ]) } chat.count = type === 'unread' ? -1 : 0 this.emit ('chat-update', {jid, count: chat.count}) @@ -55,6 +57,7 @@ export class WAConnection extends Base { /** * Sends a read receipt for a given message; * does not update the chat do @see chatRead + * @deprecated just use chatRead() * @param jid the ID of the person/group whose message you want to mark read * @param messageKey the key of the message * @param count number of messages to read, set to < 0 to unread a message @@ -124,7 +127,7 @@ export class WAConnection extends Base { const m = extra[i] fepoch -= 1 m['epoch'] = fepoch - + if(chat.messages.length < this.maxCachedMessages && !chat.messages.get (WA_MESSAGE_ID(m))) { chat.messages.insert(m) } @@ -374,14 +377,8 @@ export class WAConnection extends Base { break default: chatAttrs.type = type - const msg = (await this.loadMessages(jid, 1)).messages[0] - if (msg) { - chatAttrs.index = msg.key.id - chatAttrs.owner = msg.key.fromMe.toString() - } - if (isGroupID(jid)) { - chatAttrs.participant = whatsappID(msg.participant || msg.key.participant) - } + const index = await this.getChatIndex(jid) + chatAttrs = { ...chatAttrs, ...index } break } @@ -399,4 +396,16 @@ export class WAConnection extends Base { } return response } + protected async getChatIndex (jid: string): Promise { + const chatAttrs = {} as WAChatIndex + const { messages: [msg] } = await this.loadMessages(jid, 1) + if (msg) { + chatAttrs.index = msg.key.id + chatAttrs.owner = msg.key.fromMe.toString() as 'true' | 'false' + } + if (isGroupID(jid)) { + chatAttrs.participant = msg.key.fromMe ? this.user?.jid : whatsappID(msg.participant || msg.key.participant) + } + return chatAttrs + } } diff --git a/src/WAConnection/Constants.ts b/src/WAConnection/Constants.ts index 485339c..2717e80 100644 --- a/src/WAConnection/Constants.ts +++ b/src/WAConnection/Constants.ts @@ -231,6 +231,7 @@ export interface WAChat { presences?: { [k: string]: WAPresenceData } metadata?: WAGroupMetadata } +export type WAChatIndex = { index: string, owner: 'true' | 'false', participant?: string } export type WAChatUpdate = Partial & { jid: string, hasNewMessage?: boolean } export enum WAMetric { debugLog = 1, @@ -254,6 +255,7 @@ export enum WAMetric { queryGroup = 19, queryPreview = 20, queryEmoji = 21, + queryRead = 22, queryVCard = 29, queryStatus = 30, queryStatusUpdate = 31,