- removed timeout, use maxIdleTimeMs
- made messages a keyedDB to better utitlize message cache
- possible fix for group ID bug
This commit is contained in:
Adhiraj
2020-09-27 13:51:36 +05:30
parent 18cea74aaf
commit 3a878ae193
16 changed files with 219 additions and 121 deletions

View File

@@ -1,6 +1,6 @@
import {WAConnection as Base} from './6.MessagesSend'
import { MessageType, WAMessageKey, MessageInfo, WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto } from './Constants'
import { whatsappID, delay, toNumber, unixTimestampSeconds } from './Utils'
import { whatsappID, delay, toNumber, unixTimestampSeconds, GET_MESSAGE_ID, WA_MESSAGE_ID } from './Utils'
import { Mutex } from './Mutex'
export class WAConnection extends Base {
@@ -70,53 +70,71 @@ export class WAConnection extends Base {
const read = await this.setQuery ([['read', attributes, null]])
return read
}
async fetchMessagesFromWA (jid: string, count: number, indexMessage?: { id?: string; fromMe?: boolean }, mostRecentFirst: boolean = true) {
const json = [
'query',
{
epoch: this.msgCount.toString(),
type: 'message',
jid: jid,
kind: mostRecentFirst ? 'before' : 'after',
count: count.toString(),
index: indexMessage?.id,
owner: indexMessage?.fromMe === false ? 'false' : 'true',
},
null,
]
const response = await this.query({json, binaryTags: [WAMetric.queryMessages, WAFlag.ignore], expect200: false})
return (response[2] as WANode[])?.map(item => item[2] as WAMessage) || []
}
/**
* Load the conversation with a group or person
* @param count the number of messages to load
* @param before the data for which message to offset the query by
* @param cursor the data for which message to offset the query by
* @param mostRecentFirst retreive the most recent message first or retreive from the converation start
*/
@Mutex ((jid, _, before, mostRecentFirst) => jid + (before?.id || '') + mostRecentFirst)
async loadMessages (
jid: string,
count: number,
before?: { id?: string; fromMe?: boolean },
cursor?: { id?: string; fromMe?: boolean },
mostRecentFirst: boolean = true
) {
jid = whatsappID(jid)
const retreive = async (count: number, indexMessage: any) => {
const json = [
'query',
{
epoch: this.msgCount.toString(),
type: 'message',
jid: jid,
kind: mostRecentFirst ? 'before' : 'after',
count: count.toString(),
index: indexMessage?.id,
owner: indexMessage?.fromMe === false ? 'false' : 'true',
},
null,
]
const response = await this.query({json, binaryTags: [WAMetric.queryMessages, WAFlag.ignore], expect200: false})
return (response[2] as WANode[])?.map(item => item[2] as WAMessage) || []
}
const retreive = (count: number, indexMessage: any) => this.fetchMessagesFromWA (jid, count, indexMessage, mostRecentFirst)
const chat = this.chats.get (jid)
const hasCursor = cursor?.id && typeof cursor?.fromMe !== 'undefined'
const cursorValue = hasCursor && chat.messages.get (GET_MESSAGE_ID(cursor))
let messages: WAMessage[]
if (!before && chat && mostRecentFirst) {
messages = chat.messages
if (chat && mostRecentFirst && (!hasCursor || cursorValue)) {
messages = chat.messages.paginatedByValue (cursorValue, count, null, 'before')
const diff = count - messages.length
if (diff < 0) {
messages = messages.slice(-count); // get the last X messages
messages = messages.slice(-count) // get the last X messages
} else if (diff > 0) {
let fepoch = (messages[0] && messages[0]['epoch']) || 0
const extra = await retreive (diff, messages[0]?.key)
// add to DB
for (let i = extra.length-1;i >= 0; i--) {
const m = extra[i]
fepoch -= 1
m['epoch'] = fepoch
if(chat.messages.length < this.maxCachedMessages && !chat.messages.get (WA_MESSAGE_ID(m))) {
chat.messages.insert(m)
}
}
messages.unshift (...extra)
}
} else messages = await retreive (count, before)
let cursor
} else messages = await retreive (count, cursor)
if (messages[0]) cursor = { id: messages[0].key.id, fromMe: messages[0].key.fromMe }
else cursor = null
return {messages, cursor}
}
/**
@@ -160,7 +178,7 @@ export class WAConnection extends Base {
*/
async findMessage (jid: string, chunkSize: number, onMessage: (m: WAMessage) => boolean) {
const chat = this.chats.get (whatsappID(jid))
let count = chat?.messages?.length || chunkSize
let count = chat?.messages?.all().length || chunkSize
let offsetID
while (true) {
const {messages, cursor} = await this.loadMessages(jid, count, offsetID, true)
@@ -196,13 +214,24 @@ export class WAConnection extends Base {
return messages
}
/** Load a single message specified by the ID */
async loadMessage (jid: string, messageID: string) {
// load the message before the given message
let messages = (await this.loadMessages (jid, 1, {id: messageID, fromMe: true})).messages
if (!messages[0]) messages = (await this.loadMessages (jid, 1, {id: messageID, fromMe: false})).messages
// the message after the loaded message is the message required
const actual = await this.loadMessages (jid, 1, messages[0] && messages[0].key, false)
return actual.messages[0]
async loadMessage (jid: string, id: string) {
let message: WAMessage
jid = whatsappID (jid)
const chat = this.chats.get (jid)
if (chat) {
// see if message is present in cache
message = chat.messages.get (GET_MESSAGE_ID({ id, fromMe: true })) || chat.messages.get (GET_MESSAGE_ID({ id, fromMe: false }))
}
if (!message) {
// load the message before the given message
let messages = (await this.loadMessages (jid, 1, {id, fromMe: true})).messages
if (!messages[0]) messages = (await this.loadMessages (jid, 1, {id, fromMe: false})).messages
// the message after the loaded message is the message required
const actual = await this.loadMessages (jid, 1, messages[0] && messages[0].key, false)
message = actual.messages[0]
}
return message
}
/**
* Search WhatsApp messages with a given text string
@@ -246,9 +275,9 @@ export class WAConnection extends Base {
const chat = this.chats.get (whatsappID(messageKey.remoteJid))
if (chat) {
chat.messages = chat.messages.filter (m => m.key.id !== messageKey.id)
const value = chat.messages.get (GET_MESSAGE_ID(messageKey))
value && chat.messages.delete (value)
}
return result
}
/**