Added mutex + fixed rare duplicate chats bug + fixed connect bug

The mutex will prevent duplicate functions from being called and throwing funky errors.
This commit is contained in:
Adhiraj
2020-09-09 14:16:08 +05:30
parent 6e22ceac98
commit 6d2eaf93cb
11 changed files with 211 additions and 25 deletions

View File

@@ -112,6 +112,14 @@ WAConnectionTest('Messages', conn => {
}
})
})
it('should not duplicate messages', async () => {
const results = await Promise.all ([
conn.loadMessages (testJid, 50),
conn.loadMessages (testJid, 50)
])
assert.deepEqual (results[0].messages, results[1].messages)
assert.ok (results[0].messages.length <= 50)
})
it('should deliver a message', async () => {
const waitForUpdate =
promiseTimeout(15000, resolve => {

111
src/Tests/Tests.Mutex.ts Normal file
View File

@@ -0,0 +1,111 @@
import { strict as assert } from 'assert'
import { Mutex } from '../WAConnection/Mutex'
const DEFAULT_WAIT = 1000
class MyClass {
didDoWork = false
values: { [k: string]: number } = {}
counter = 0
@Mutex ()
async myFunction () {
if (this.didDoWork) return
await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT))
if (this.didDoWork) {
throw new Error ('work already done')
}
this.didDoWork = true
}
@Mutex (key => key)
async myKeyedFunction (key: string) {
if (!this.values[key]) {
await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT))
if (this.values[key]) throw new Error ('value already set for ' + key)
this.values[key] = Math.floor(Math.random ()*100)
}
return this.values[key]
}
@Mutex (key => key)
async myQueingFunction (key: string) {
await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT))
}
@Mutex ()
async myErrorFunction () {
await new Promise (resolve => setTimeout(resolve, 100))
this.counter += 1
if (this.counter % 2 === 0) {
throw new Error ('failed')
}
}
}
describe ('garbage', () => {
it ('should only execute once', async () => {
const stuff = new MyClass ()
const start = new Date ()
await Promise.all ([...Array(1000)].map(() => stuff.myFunction ()))
const diff = new Date ().getTime()-start.getTime()
assert.ok (diff < DEFAULT_WAIT*1.25)
})
it ('should only execute once based on the key', async () => {
const stuff = new MyClass ()
const start = new Date ()
/*
In this test, the mutex will lock the function based on the key.
So, if the function with argument `key1` is underway
and another function with key `key1` is called,
the call is blocked till the first function completes.
However, if argument `key2` is passed, the function is allowed to pass.
*/
const keys = ['key1', 'key2', 'key3']
const duplicates = 1000
const results = await Promise.all (
keys.flatMap (key => (
[...Array(duplicates)].map(() => stuff.myKeyedFunction (key))
))
)
assert.deepEqual (
results.slice(0, duplicates).filter (r => r !== results[0]),
[]
)
const diff = new Date ().getTime()-start.getTime()
assert.ok (diff < DEFAULT_WAIT*1.25)
})
it ('should execute operations in a queue', async () => {
const stuff = new MyClass ()
const start = new Date ()
const keys = ['key1', 'key2', 'key3']
await Promise.all (
keys.flatMap (key => (
[...Array(2)].map(() => stuff.myQueingFunction (key))
))
)
const diff = new Date ().getTime()-start.getTime()
assert.ok (diff < DEFAULT_WAIT*2.2 && diff > DEFAULT_WAIT*1.5)
})
it ('should throw an error on selected items', async () => {
const stuff = new MyClass ()
const start = new Date ()
const WAIT = 100
const FUNCS = 40
const results = await Promise.all (
[...Array(FUNCS)].map(() => stuff.myErrorFunction ().catch(err => err.message))
)
const diff = new Date ().getTime()-start.getTime()
assert.ok (diff < WAIT*FUNCS*1.1)
assert.equal (
results.filter (r => r === 'failed').length,
FUNCS/2 // half should fail
)
})
})

View File

@@ -55,7 +55,7 @@ export class WAConnection extends EventEmitter {
/** Whether the phone is connected */
phoneConnected: boolean = false
maxCachedMessages = 25
maxCachedMessages = 50
chats: KeyedDB<WAChat> = new KeyedDB (Utils.waChatUniqueKey, value => value.jid)
contacts: { [k: string]: WAContact } = {}

View File

@@ -64,7 +64,17 @@ export class WAConnection extends Base {
// actual connect
const connect = () => {
const timeoutMs = options?.timeoutMs || 60*1000
let task = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
let cancel: () => void
const task = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
let task: Promise<any> = Promise.resolve ()
// add wait for chats promise if required
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const {waitForChats, cancelChats} = this.receiveChatsAndContacts()
task = waitForChats
cancel = cancelChats
}
// determine whether reconnect should be used or not
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced &&
this.lastDisconnectReason !== DisconnectReason.unknown &&
@@ -76,30 +86,35 @@ export class WAConnection extends Base {
this.conn = new WS(WS_URL, null, { origin: DEFAULT_ORIGIN, timeout: timeoutMs, agent: options.agent })
this.conn.on('message', data => this.onMessageRecieved(data as any))
this.conn.on ('open', () => {
this.conn.on ('open', async () => {
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
this.authenticate(reconnectID)
.then (resolve)
.then (() => (
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
))
.catch (reject)
try {
task = Promise.all ([
task,
this.authenticate (reconnectID)
.then (
() => {
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
}
)
])
const [result] = await task
resolve (result)
} catch (error) {
reject (error)
}
})
this.conn.on('error', reject)
this.conn.on('close', () => reject(new Error('close')))
})
let cancel: () => void
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const {waitForChats, cancelChats} = this.receiveChatsAndContacts()
task = Promise.all ([task, waitForChats]).then (([_, updates]) => updates)
cancel = cancelChats
}
task = task as Promise<void | { [k: string]: Partial<WAChat> }>
const rejectSafe = error => {
task = task.catch (() => {})
reject (error)
}
this.conn.on('error', rejectSafe)
this.conn.on('close', () => rejectSafe(new Error('close')))
}) as Promise<void | { [k: string]: Partial<WAChat> }>
return { promise: task, cancel }
}

View File

@@ -252,6 +252,10 @@ export class WAConnection extends Base {
case WA_MESSAGE_STUB_TYPE.GROUP_PARTICIPANT_ADD:
case WA_MESSAGE_STUB_TYPE.GROUP_PARTICIPANT_INVITE:
participants = message.messageStubParameters.map (whatsappID)
if (participants.includes(this.user.jid) && chat.read_only === 'true') {
delete chat.read_only
this.emit ('chat-update', { jid, read_only: 'false' })
}
this.emit ('group-participants-add', { jid, participants, actor })
break
case WA_MESSAGE_STUB_TYPE.GROUP_CHANGE_ANNOUNCE:

View File

@@ -7,6 +7,7 @@ import {
WAFlag,
} from '../WAConnection/Constants'
import { generateProfilePicture, waChatUniqueKey, whatsappID, unixTimestampSeconds } from './Utils'
import { Mutex } from './Mutex'
// All user related functions -- get profile picture, set status etc.
@@ -108,6 +109,12 @@ export class WAConnection extends Base {
const cursor = (chats[chats.length-1] && chats.length >= count) ? waChatUniqueKey (chats[chats.length-1]) : null
return { chats, cursor }
}
/**
* Update the profile picture
* @param jid
* @param img
*/
@Mutex (jid => jid)
async updateProfilePicture (jid: string, img: Buffer) {
jid = whatsappID (jid)
const data = await generateProfilePicture (img)
@@ -133,6 +140,7 @@ export class WAConnection extends Base {
* @param jid the ID of the person/group you are modifiying
* @param durationMs only for muting, how long to mute the chat for
*/
@Mutex ((jid, type) => jid+type)
async modifyChat (jid: string, type: ChatModification, durationMs?: number) {
jid = whatsappID (jid)
const chat = this.assertChatGet (jid)

View File

@@ -12,6 +12,7 @@ import {
WAMessageContent, WAMetric, WAFlag, WAMessage, BaileysError, MessageLogLevel, WA_MESSAGE_STATUS_TYPE, WAMessageProto, MediaConnInfo
} from './Constants'
import { generateMessageID, sha256, hmacSign, aesEncrypWithIV, randomBytes, generateThumbnail, getMediaKeys, decodeMediaMessageBuffer, extensionForMediaMessage, whatsappID, unixTimestampSeconds } from './Utils'
import { Mutex } from './Mutex'
export class WAConnection extends Base {
/**
@@ -194,6 +195,7 @@ export class WAConnection extends Base {
* You may need to call this when the message is old & the content is deleted off of the WA servers
* @param message
*/
@Mutex (message => message?.key?.id)
async updateMediaMessage (message: WAMessage) {
const content = message.message?.audioMessage || message.message?.videoMessage || message.message?.imageMessage || message.message?.stickerMessage || message.message?.documentMessage
if (!content) throw new BaileysError (`given message ${message.key.id} is not a media message`, message)
@@ -206,6 +208,7 @@ export class WAConnection extends Base {
* Securely downloads the media from the message.
* Renews the download url automatically, if necessary.
*/
@Mutex (message => message?.key?.id)
async downloadMediaMessage (message: WAMessage) {
try {
const buff = await decodeMediaMessageBuffer (message.message, this.fetchRequest)

View File

@@ -8,9 +8,11 @@ import {
WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, BaileysError, MessageLogLevel, WA_MESSAGE_STATUS_TYPE
} from './Constants'
import { whatsappID, delay, toNumber, unixTimestampSeconds } from './Utils'
import { Mutex } from './Mutex'
export class WAConnection extends Base {
@Mutex ()
async loadAllUnreadMessages () {
const tasks = this.chats.all()
.filter(chat => chat.count > 0)
@@ -20,8 +22,8 @@ export class WAConnection extends Base {
list.forEach (({messages}) => combined.push(...messages))
return combined
}
/** Get the message info, who has read it, who its been delivered to */
@Mutex ((jid, messageID) => jid+messageID)
async messageInfo (jid: string, messageID: string) {
const query = ['query', {type: 'message_info', index: messageID, jid: jid, epoch: this.msgCount.toString()}, null]
const response = (await this.query ({json: query, binaryTags: [22, WAFlag.ignore], expect200: true}))[2]
@@ -45,6 +47,7 @@ export class WAConnection extends Base {
* @param jid the ID of the person/group whose message you want to mark read
* @param unread unreads the chat, if true
*/
@Mutex (jid => jid)
async chatRead (jid: string, type: 'unread' | 'read' = 'read') {
jid = whatsappID (jid)
const chat = this.assertChatGet (jid)
@@ -99,6 +102,7 @@ export class WAConnection extends Base {
* @param before the data for which message to offset the query by
* @param mostRecentFirst retreive the most recent message first or retreive from the converation start
*/
@Mutex ((jid, _, before, mostRecentFirst) => jid + (before?.id || '') + mostRecentFirst)
async loadMessages (
jid: string,
count: number,
@@ -271,6 +275,7 @@ export class WAConnection extends Base {
* Delete a message in a chat for yourself
* @param messageKey key of the message you want to delete
*/
@Mutex (m => m.remoteJid)
async clearMessage (messageKey: WAMessageKey) {
const tag = Math.round(Math.random ()*1000000)
const attrs: WANode = [
@@ -280,7 +285,14 @@ export class WAConnection extends Base {
['item', {owner: `${messageKey.fromMe}`, index: messageKey.id}, null]
]
]
return this.setQuery ([attrs])
const result = await this.setQuery ([attrs])
const chat = this.chats.get (whatsappID(messageKey.remoteJid))
if (chat) {
chat.messages = chat.messages.filter (m => m.key.id !== messageKey.id)
}
return result
}
/**
* Delete a message in a chat for everyone

24
src/WAConnection/Mutex.ts Normal file
View File

@@ -0,0 +1,24 @@
/**
* A simple mutex that can be used as a decorator. For examples, see Tests.Mutex.ts
* @param keyGetter if you want to lock functions based on certain arguments, specify the key for the function based on the arguments
*/
export function Mutex (keyGetter?: (...args: any[]) => string) {
let tasks: { [k: string]: Promise<void> } = {}
return function (_, __, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value
descriptor.value = function (this: Object, ...args) {
const key = (keyGetter && keyGetter.call(this, ...args)) || 'undefined'
tasks[key] = (async () => {
try {
tasks[key] && await tasks[key]
} catch {
}
const result = await originalMethod.call(this, ...args)
return result
})()
return tasks[key]
}
}
}

View File

@@ -2,6 +2,7 @@
"compilerOptions": {
"target": "es2018",
"module": "commonjs",
"experimentalDecorators": true,
"allowJs": true,
"checkJs": false,
"outDir": "lib",

View File

@@ -2,7 +2,7 @@
# yarn lockfile v1
"@adiwajshing/keyed-db@^0.1.2":
"@adiwajshing/keyed-db@^0.1.4":
version "0.1.4"
resolved "https://registry.yarnpkg.com/@adiwajshing/keyed-db/-/keyed-db-0.1.4.tgz#ed82563d8738cf2877a3c7c53a8739aa52f5efbf"
integrity sha512-Sed2xppK0b1Ayfwy+ONl4GKTUahzIRmaxHULzraPbIiRx9QHOQ3PJon9ZT1qtkebGdSRX9uHvwruz2VU03/sCQ==