mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
Indexing change + missing messages detection
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
import { Presence, ChatModification, delay } from '../WAConnection/WAConnection'
|
import { Presence, ChatModification, delay, newMessagesDB } from '../WAConnection/WAConnection'
|
||||||
import { promises as fs } from 'fs'
|
import { promises as fs } from 'fs'
|
||||||
import * as assert from 'assert'
|
import * as assert from 'assert'
|
||||||
import fetch from 'node-fetch'
|
import fetch from 'node-fetch'
|
||||||
@@ -156,4 +156,41 @@ WAConnectionTest('Misc', (conn) => {
|
|||||||
conn.generateLinkPreview ('I sent links to https://teachyourselfcs.com/ and https://www.fast.ai/')
|
conn.generateLinkPreview ('I sent links to https://teachyourselfcs.com/ and https://www.fast.ai/')
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
// this test requires quite a few messages with the test JID
|
||||||
|
it('should detect overlaps and clear messages accordingly', async () => {
|
||||||
|
// wait for chats
|
||||||
|
await new Promise(resolve => conn.once('chats-received', resolve))
|
||||||
|
|
||||||
|
conn.maxCachedMessages = 100
|
||||||
|
|
||||||
|
const chat = conn.chats.get(testJid)
|
||||||
|
const oldCount = chat.messages.length
|
||||||
|
console.log(`test chat has ${oldCount} pre-loaded messages`)
|
||||||
|
// load 100 messages
|
||||||
|
await conn.loadMessages(testJid, 100, undefined)
|
||||||
|
assert.strictEqual(chat.messages.length, 100)
|
||||||
|
|
||||||
|
conn.close()
|
||||||
|
// remove all latest messages
|
||||||
|
chat.messages = newMessagesDB( chat.messages.all().slice(0, 20) )
|
||||||
|
|
||||||
|
const task = new Promise((resolve, reject) => (
|
||||||
|
conn.on('chats-received', ({ hasReceivedLastMessage, chatsWithMissingMessages }) => {
|
||||||
|
if (hasReceivedLastMessage) {
|
||||||
|
assert.strictEqual(Object.keys(chatsWithMissingMessages).length, 1)
|
||||||
|
const missing = chatsWithMissingMessages.find(({ jid }) => jid === testJid)
|
||||||
|
assert.ok(missing, 'missing message not detected')
|
||||||
|
assert.strictEqual(
|
||||||
|
conn.chats.get(testJid).messages.length,
|
||||||
|
missing.count
|
||||||
|
)
|
||||||
|
assert.strictEqual(missing.count, oldCount)
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
))
|
||||||
|
|
||||||
|
await conn.connect()
|
||||||
|
await task
|
||||||
|
})
|
||||||
})
|
})
|
||||||
@@ -64,8 +64,15 @@ export class WAConnection extends Base {
|
|||||||
})
|
})
|
||||||
// we store these last messages
|
// we store these last messages
|
||||||
const lastMessages = {}
|
const lastMessages = {}
|
||||||
|
// keep track of overlaps,
|
||||||
|
// if there are no overlaps of messages and we had messages present, we clear the previous messages
|
||||||
|
// this prevents missing messages in conversations
|
||||||
|
let overlaps: { [_: string]: { requiresOverlap: boolean, didOverlap?: boolean } } = {}
|
||||||
// messages received
|
// messages received
|
||||||
const messagesUpdate = (json, style: 'prepend' | 'append') => {
|
const messagesUpdate = (json, style: 'previous' | 'last') => {
|
||||||
|
if (style === 'last') {
|
||||||
|
overlaps = {}
|
||||||
|
}
|
||||||
const isLast = json[1].last
|
const isLast = json[1].last
|
||||||
const messages = json[2] as WANode[]
|
const messages = json[2] as WANode[]
|
||||||
if (messages) {
|
if (messages) {
|
||||||
@@ -74,17 +81,26 @@ export class WAConnection extends Base {
|
|||||||
const jid = message.key.remoteJid
|
const jid = message.key.remoteJid
|
||||||
const chat = this.chats.get(jid)
|
const chat = this.chats.get(jid)
|
||||||
const mKeyID = WA_MESSAGE_ID(message)
|
const mKeyID = WA_MESSAGE_ID(message)
|
||||||
if (chat && !chat.messages.get(mKeyID)) {
|
if (chat) {
|
||||||
if (style === 'prepend') {
|
if (style === 'previous') {
|
||||||
const fm = chat.messages.get(lastMessages[jid])
|
const fm = chat.messages.get(lastMessages[jid])
|
||||||
if (!fm) return
|
if (!fm) return
|
||||||
const prevEpoch = fm['epoch']
|
const prevEpoch = fm['epoch']
|
||||||
message['epoch'] = prevEpoch-1
|
message['epoch'] = prevEpoch-1
|
||||||
} else if (style === 'append') {
|
} else if (style === 'last') {
|
||||||
|
// no overlap required, if there were no previous messages
|
||||||
|
overlaps[jid] = { requiresOverlap: chat.messages.length > 0 }
|
||||||
|
|
||||||
const lm = chat.messages.all()[chat.messages.length-1]
|
const lm = chat.messages.all()[chat.messages.length-1]
|
||||||
const prevEpoch = (lm && lm['epoch']) || 0
|
const prevEpoch = (lm && lm['epoch']) || 0
|
||||||
message['epoch'] = prevEpoch+100 // hacky way to allow more previous messages
|
// hacky way to allow more previous messages
|
||||||
|
message['epoch'] = prevEpoch+1000
|
||||||
}
|
}
|
||||||
|
if (chat.messages.get(mKeyID)) {
|
||||||
|
chat.messages.delete(message)
|
||||||
|
overlaps[jid] = { ...(overlaps[jid] || { requiresOverlap: true }), didOverlap: true }
|
||||||
|
}
|
||||||
|
|
||||||
chat.messages.insert (message)
|
chat.messages.insert (message)
|
||||||
|
|
||||||
updates[jid] = updates[jid] || newMessagesDB()
|
updates[jid] = updates[jid] || newMessagesDB()
|
||||||
@@ -100,12 +116,27 @@ export class WAConnection extends Base {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isLast) {
|
if (isLast) {
|
||||||
this.emit('chats-received', { hasReceivedLastMessage: true })
|
// find which chats had missing messages
|
||||||
|
// list out all the jids, and how many messages we've cached now
|
||||||
|
const chatsWithMissingMessages = Object.keys(overlaps).map(jid => {
|
||||||
|
// if there was no overlap, delete previous messages
|
||||||
|
if (!overlaps[jid].didOverlap && overlaps[jid].requiresOverlap) {
|
||||||
|
this.logger.debug(`received messages for ${jid}, but did not overlap with previous messages, clearing...`)
|
||||||
|
const chat = this.chats.get(jid)
|
||||||
|
if (chat) {
|
||||||
|
const message = chat.messages.get(lastMessages[jid])
|
||||||
|
const remainingMessages = chat.messages.paginatedByValue(message, this.maxCachedMessages, undefined, 'after')
|
||||||
|
chat.messages = newMessagesDB([message, ...remainingMessages])
|
||||||
|
return { jid, count: chat.messages.length } // return number of messages we've left
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).filter(Boolean)
|
||||||
|
this.emit('chats-received', { hasReceivedLastMessage: true, chatsWithMissingMessages })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.on('CB:action,add:last', json => messagesUpdate(json, 'append'))
|
this.on('CB:action,add:last', json => messagesUpdate(json, 'last'))
|
||||||
this.on('CB:action,add:before', json => messagesUpdate(json, 'prepend'))
|
this.on('CB:action,add:before', json => messagesUpdate(json, 'previous'))
|
||||||
this.on('CB:action,add:unread', json => messagesUpdate(json, 'prepend'))
|
this.on('CB:action,add:unread', json => messagesUpdate(json, 'previous'))
|
||||||
|
|
||||||
// contacts received
|
// contacts received
|
||||||
this.on('CB:response,type:contacts', json => {
|
this.on('CB:response,type:contacts', json => {
|
||||||
@@ -575,7 +606,7 @@ export class WAConnection extends Base {
|
|||||||
/** when contacts are sent by WA */
|
/** when contacts are sent by WA */
|
||||||
on (event: 'contacts-received', listener: () => void): this
|
on (event: 'contacts-received', listener: () => void): this
|
||||||
/** when chats are sent by WA, and when all messages are received */
|
/** when chats are sent by WA, and when all messages are received */
|
||||||
on (event: 'chats-received', listener: (update: {hasNewChats?: boolean, hasReceivedLastMessage?: boolean}) => void): this
|
on (event: 'chats-received', listener: (update: {hasNewChats?: boolean, hasReceivedLastMessage?: boolean, chatsWithMissingMessages: { jid: string, count: number }[] }) => void): this
|
||||||
/** when multiple chats are updated (new message, updated message, deleted, pinned, etc) */
|
/** when multiple chats are updated (new message, updated message, deleted, pinned, etc) */
|
||||||
on (event: 'chats-update', listener: (chats: WAChatUpdate[]) => void): this
|
on (event: 'chats-update', listener: (chats: WAChatUpdate[]) => void): this
|
||||||
/** when a chat is updated (new message, updated message, deleted, pinned, presence updated etc) */
|
/** when a chat is updated (new message, updated message, deleted, pinned, presence updated etc) */
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import {WAConnection as Base} from './4.Events'
|
import {WAConnection as Base} from './4.Events'
|
||||||
import { Presence, WABroadcastListInfo, WAProfilePictureChange, WALoadChatOptions } from './Constants'
|
import { Presence, WABroadcastListInfo, WAProfilePictureChange, WALoadChatOptions, WAChatIndex } from './Constants'
|
||||||
import {
|
import {
|
||||||
WAMessage,
|
WAMessage,
|
||||||
WANode,
|
WANode,
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import {WAConnection as Base} from './6.MessagesSend'
|
import {WAConnection as Base} from './6.MessagesSend'
|
||||||
import { MessageType, WAMessageKey, MessageInfo, WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, ChatModification, BaileysError } from './Constants'
|
import { MessageType, WAMessageKey, MessageInfo, WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, ChatModification, BaileysError, WAChatIndex } from './Constants'
|
||||||
import { whatsappID, delay, toNumber, unixTimestampSeconds, GET_MESSAGE_ID, WA_MESSAGE_ID, isGroupID } from './Utils'
|
import { whatsappID, delay, toNumber, unixTimestampSeconds, GET_MESSAGE_ID, WA_MESSAGE_ID, isGroupID } from './Utils'
|
||||||
import { Mutex } from './Mutex'
|
import { Mutex } from './Mutex'
|
||||||
|
|
||||||
@@ -19,7 +19,7 @@ export class WAConnection extends Base {
|
|||||||
@Mutex ((jid, messageID) => jid+messageID)
|
@Mutex ((jid, messageID) => jid+messageID)
|
||||||
async messageInfo (jid: string, messageID: string) {
|
async messageInfo (jid: string, messageID: string) {
|
||||||
const query = ['query', {type: 'message_info', index: messageID, jid: jid, epoch: this.msgCount.toString()}, null]
|
const query = ['query', {type: 'message_info', index: messageID, jid: jid, epoch: this.msgCount.toString()}, null]
|
||||||
const response = (await this.query ({json: query, binaryTags: [22, WAFlag.ignore], expect200: true}))[2]
|
const [,,response] = await this.query ({json: query, binaryTags: [WAMetric.queryRead, WAFlag.ignore], expect200: true})
|
||||||
|
|
||||||
const info: MessageInfo = {reads: [], deliveries: []}
|
const info: MessageInfo = {reads: [], deliveries: []}
|
||||||
if (response) {
|
if (response) {
|
||||||
@@ -44,10 +44,12 @@ export class WAConnection extends Base {
|
|||||||
jid = whatsappID (jid)
|
jid = whatsappID (jid)
|
||||||
const chat = this.assertChatGet (jid)
|
const chat = this.assertChatGet (jid)
|
||||||
|
|
||||||
const message = (await this.loadMessages(jid, 1)).messages[0]
|
const count = type === 'unread' ? '-2' : Math.abs(chat.count).toString()
|
||||||
const count = type === 'unread' ? -2 : Math.abs(chat.count)
|
|
||||||
if (type === 'unread' || chat.count !== 0) {
|
if (type === 'unread' || chat.count !== 0) {
|
||||||
await this.sendReadReceipt (jid, message.key, count)
|
const idx = await this.getChatIndex(jid)
|
||||||
|
await this.setQuery ([
|
||||||
|
['read', { jid, count, ...idx, participant: undefined }, null]
|
||||||
|
], [ WAMetric.read, WAFlag.ignore ])
|
||||||
}
|
}
|
||||||
chat.count = type === 'unread' ? -1 : 0
|
chat.count = type === 'unread' ? -1 : 0
|
||||||
this.emit ('chat-update', {jid, count: chat.count})
|
this.emit ('chat-update', {jid, count: chat.count})
|
||||||
@@ -55,6 +57,7 @@ export class WAConnection extends Base {
|
|||||||
/**
|
/**
|
||||||
* Sends a read receipt for a given message;
|
* Sends a read receipt for a given message;
|
||||||
* does not update the chat do @see chatRead
|
* does not update the chat do @see chatRead
|
||||||
|
* @deprecated just use chatRead()
|
||||||
* @param jid the ID of the person/group whose message you want to mark read
|
* @param jid the ID of the person/group whose message you want to mark read
|
||||||
* @param messageKey the key of the message
|
* @param messageKey the key of the message
|
||||||
* @param count number of messages to read, set to < 0 to unread a message
|
* @param count number of messages to read, set to < 0 to unread a message
|
||||||
@@ -124,7 +127,7 @@ export class WAConnection extends Base {
|
|||||||
const m = extra[i]
|
const m = extra[i]
|
||||||
fepoch -= 1
|
fepoch -= 1
|
||||||
m['epoch'] = fepoch
|
m['epoch'] = fepoch
|
||||||
|
|
||||||
if(chat.messages.length < this.maxCachedMessages && !chat.messages.get (WA_MESSAGE_ID(m))) {
|
if(chat.messages.length < this.maxCachedMessages && !chat.messages.get (WA_MESSAGE_ID(m))) {
|
||||||
chat.messages.insert(m)
|
chat.messages.insert(m)
|
||||||
}
|
}
|
||||||
@@ -374,14 +377,8 @@ export class WAConnection extends Base {
|
|||||||
break
|
break
|
||||||
default:
|
default:
|
||||||
chatAttrs.type = type
|
chatAttrs.type = type
|
||||||
const msg = (await this.loadMessages(jid, 1)).messages[0]
|
const index = await this.getChatIndex(jid)
|
||||||
if (msg) {
|
chatAttrs = { ...chatAttrs, ...index }
|
||||||
chatAttrs.index = msg.key.id
|
|
||||||
chatAttrs.owner = msg.key.fromMe.toString()
|
|
||||||
}
|
|
||||||
if (isGroupID(jid)) {
|
|
||||||
chatAttrs.participant = whatsappID(msg.participant || msg.key.participant)
|
|
||||||
}
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -399,4 +396,16 @@ export class WAConnection extends Base {
|
|||||||
}
|
}
|
||||||
return response
|
return response
|
||||||
}
|
}
|
||||||
|
protected async getChatIndex (jid: string): Promise<WAChatIndex> {
|
||||||
|
const chatAttrs = {} as WAChatIndex
|
||||||
|
const { messages: [msg] } = await this.loadMessages(jid, 1)
|
||||||
|
if (msg) {
|
||||||
|
chatAttrs.index = msg.key.id
|
||||||
|
chatAttrs.owner = msg.key.fromMe.toString() as 'true' | 'false'
|
||||||
|
}
|
||||||
|
if (isGroupID(jid)) {
|
||||||
|
chatAttrs.participant = msg.key.fromMe ? this.user?.jid : whatsappID(msg.participant || msg.key.participant)
|
||||||
|
}
|
||||||
|
return chatAttrs
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -231,6 +231,7 @@ export interface WAChat {
|
|||||||
presences?: { [k: string]: WAPresenceData }
|
presences?: { [k: string]: WAPresenceData }
|
||||||
metadata?: WAGroupMetadata
|
metadata?: WAGroupMetadata
|
||||||
}
|
}
|
||||||
|
export type WAChatIndex = { index: string, owner: 'true' | 'false', participant?: string }
|
||||||
export type WAChatUpdate = Partial<WAChat> & { jid: string, hasNewMessage?: boolean }
|
export type WAChatUpdate = Partial<WAChat> & { jid: string, hasNewMessage?: boolean }
|
||||||
export enum WAMetric {
|
export enum WAMetric {
|
||||||
debugLog = 1,
|
debugLog = 1,
|
||||||
@@ -254,6 +255,7 @@ export enum WAMetric {
|
|||||||
queryGroup = 19,
|
queryGroup = 19,
|
||||||
queryPreview = 20,
|
queryPreview = 20,
|
||||||
queryEmoji = 21,
|
queryEmoji = 21,
|
||||||
|
queryRead = 22,
|
||||||
queryVCard = 29,
|
queryVCard = 29,
|
||||||
queryStatus = 30,
|
queryStatus = 30,
|
||||||
queryStatusUpdate = 31,
|
queryStatusUpdate = 31,
|
||||||
|
|||||||
Reference in New Issue
Block a user