From 6d2eaf93cb0c7ad5a87464418cdf1505efec5e4c Mon Sep 17 00:00:00 2001 From: Adhiraj Date: Wed, 9 Sep 2020 14:16:08 +0530 Subject: [PATCH] Added mutex + fixed rare duplicate chats bug + fixed connect bug The mutex will prevent duplicate functions from being called and throwing funky errors. --- src/Tests/Tests.Messages.ts | 8 ++ src/Tests/Tests.Mutex.ts | 111 ++++++++++++++++++++++++++++ src/WAConnection/0.Base.ts | 2 +- src/WAConnection/3.Connect.ts | 57 ++++++++------ src/WAConnection/4.Events.ts | 4 + src/WAConnection/5.User.ts | 8 ++ src/WAConnection/6.MessagesSend.ts | 3 + src/WAConnection/7.MessagesExtra.ts | 16 +++- src/WAConnection/Mutex.ts | 24 ++++++ tsconfig.json | 1 + yarn.lock | 2 +- 11 files changed, 211 insertions(+), 25 deletions(-) create mode 100644 src/Tests/Tests.Mutex.ts create mode 100644 src/WAConnection/Mutex.ts diff --git a/src/Tests/Tests.Messages.ts b/src/Tests/Tests.Messages.ts index b149790..f581e31 100644 --- a/src/Tests/Tests.Messages.ts +++ b/src/Tests/Tests.Messages.ts @@ -112,6 +112,14 @@ WAConnectionTest('Messages', conn => { } }) }) + it('should not duplicate messages', async () => { + const results = await Promise.all ([ + conn.loadMessages (testJid, 50), + conn.loadMessages (testJid, 50) + ]) + assert.deepEqual (results[0].messages, results[1].messages) + assert.ok (results[0].messages.length <= 50) + }) it('should deliver a message', async () => { const waitForUpdate = promiseTimeout(15000, resolve => { diff --git a/src/Tests/Tests.Mutex.ts b/src/Tests/Tests.Mutex.ts new file mode 100644 index 0000000..ae4f6d6 --- /dev/null +++ b/src/Tests/Tests.Mutex.ts @@ -0,0 +1,111 @@ +import { strict as assert } from 'assert' +import { Mutex } from '../WAConnection/Mutex' + +const DEFAULT_WAIT = 1000 + +class MyClass { + didDoWork = false + values: { [k: string]: number } = {} + counter = 0 + + @Mutex () + async myFunction () { + if (this.didDoWork) return + + await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT)) + if (this.didDoWork) { + throw new Error ('work already done') + } + this.didDoWork = true + } + @Mutex (key => key) + async myKeyedFunction (key: string) { + if (!this.values[key]) { + await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT)) + if (this.values[key]) throw new Error ('value already set for ' + key) + this.values[key] = Math.floor(Math.random ()*100) + } + return this.values[key] + } + @Mutex (key => key) + async myQueingFunction (key: string) { + await new Promise (resolve => setTimeout(resolve, DEFAULT_WAIT)) + } + @Mutex () + async myErrorFunction () { + await new Promise (resolve => setTimeout(resolve, 100)) + this.counter += 1 + if (this.counter % 2 === 0) { + throw new Error ('failed') + } + } +} + +describe ('garbage', () => { + it ('should only execute once', async () => { + const stuff = new MyClass () + const start = new Date () + await Promise.all ([...Array(1000)].map(() => stuff.myFunction ())) + const diff = new Date ().getTime()-start.getTime() + assert.ok (diff < DEFAULT_WAIT*1.25) + }) + it ('should only execute once based on the key', async () => { + const stuff = new MyClass () + const start = new Date () + /* + In this test, the mutex will lock the function based on the key. + + So, if the function with argument `key1` is underway + and another function with key `key1` is called, + the call is blocked till the first function completes. + However, if argument `key2` is passed, the function is allowed to pass. + */ + const keys = ['key1', 'key2', 'key3'] + const duplicates = 1000 + const results = await Promise.all ( + keys.flatMap (key => ( + [...Array(duplicates)].map(() => stuff.myKeyedFunction (key)) + )) + ) + assert.deepEqual ( + results.slice(0, duplicates).filter (r => r !== results[0]), + [] + ) + + const diff = new Date ().getTime()-start.getTime() + assert.ok (diff < DEFAULT_WAIT*1.25) + }) + it ('should execute operations in a queue', async () => { + const stuff = new MyClass () + const start = new Date () + + const keys = ['key1', 'key2', 'key3'] + + await Promise.all ( + keys.flatMap (key => ( + [...Array(2)].map(() => stuff.myQueingFunction (key)) + )) + ) + + const diff = new Date ().getTime()-start.getTime() + assert.ok (diff < DEFAULT_WAIT*2.2 && diff > DEFAULT_WAIT*1.5) + }) + it ('should throw an error on selected items', async () => { + const stuff = new MyClass () + const start = new Date () + + const WAIT = 100 + const FUNCS = 40 + const results = await Promise.all ( + [...Array(FUNCS)].map(() => stuff.myErrorFunction ().catch(err => err.message)) + ) + + const diff = new Date ().getTime()-start.getTime() + assert.ok (diff < WAIT*FUNCS*1.1) + + assert.equal ( + results.filter (r => r === 'failed').length, + FUNCS/2 // half should fail + ) + }) +}) \ No newline at end of file diff --git a/src/WAConnection/0.Base.ts b/src/WAConnection/0.Base.ts index 9885754..3deaa22 100644 --- a/src/WAConnection/0.Base.ts +++ b/src/WAConnection/0.Base.ts @@ -55,7 +55,7 @@ export class WAConnection extends EventEmitter { /** Whether the phone is connected */ phoneConnected: boolean = false - maxCachedMessages = 25 + maxCachedMessages = 50 chats: KeyedDB = new KeyedDB (Utils.waChatUniqueKey, value => value.jid) contacts: { [k: string]: WAContact } = {} diff --git a/src/WAConnection/3.Connect.ts b/src/WAConnection/3.Connect.ts index a90f772..00c1b32 100644 --- a/src/WAConnection/3.Connect.ts +++ b/src/WAConnection/3.Connect.ts @@ -64,7 +64,17 @@ export class WAConnection extends Base { // actual connect const connect = () => { const timeoutMs = options?.timeoutMs || 60*1000 - let task = Utils.promiseTimeout(timeoutMs, (resolve, reject) => { + + let cancel: () => void + const task = Utils.promiseTimeout(timeoutMs, (resolve, reject) => { + let task: Promise = Promise.resolve () + // add wait for chats promise if required + if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) { + const {waitForChats, cancelChats} = this.receiveChatsAndContacts() + task = waitForChats + cancel = cancelChats + } + // determine whether reconnect should be used or not const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced && this.lastDisconnectReason !== DisconnectReason.unknown && @@ -76,30 +86,35 @@ export class WAConnection extends Base { this.conn = new WS(WS_URL, null, { origin: DEFAULT_ORIGIN, timeout: timeoutMs, agent: options.agent }) this.conn.on('message', data => this.onMessageRecieved(data as any)) - this.conn.on ('open', () => { + this.conn.on ('open', async () => { this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info) - this.authenticate(reconnectID) - .then (resolve) - .then (() => ( - this.conn - .removeAllListeners ('error') - .removeAllListeners ('close') - )) - .catch (reject) + try { + task = Promise.all ([ + task, + this.authenticate (reconnectID) + .then ( + () => { + this.conn + .removeAllListeners ('error') + .removeAllListeners ('close') + } + ) + ]) + const [result] = await task + resolve (result) + } catch (error) { + reject (error) + } }) - this.conn.on('error', reject) - this.conn.on('close', () => reject(new Error('close'))) - }) - let cancel: () => void - if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) { - const {waitForChats, cancelChats} = this.receiveChatsAndContacts() - task = Promise.all ([task, waitForChats]).then (([_, updates]) => updates) - cancel = cancelChats - } - - task = task as Promise }> + const rejectSafe = error => { + task = task.catch (() => {}) + reject (error) + } + this.conn.on('error', rejectSafe) + this.conn.on('close', () => rejectSafe(new Error('close'))) + }) as Promise }> return { promise: task, cancel } } diff --git a/src/WAConnection/4.Events.ts b/src/WAConnection/4.Events.ts index 9c1b1d4..afe92e9 100644 --- a/src/WAConnection/4.Events.ts +++ b/src/WAConnection/4.Events.ts @@ -252,6 +252,10 @@ export class WAConnection extends Base { case WA_MESSAGE_STUB_TYPE.GROUP_PARTICIPANT_ADD: case WA_MESSAGE_STUB_TYPE.GROUP_PARTICIPANT_INVITE: participants = message.messageStubParameters.map (whatsappID) + if (participants.includes(this.user.jid) && chat.read_only === 'true') { + delete chat.read_only + this.emit ('chat-update', { jid, read_only: 'false' }) + } this.emit ('group-participants-add', { jid, participants, actor }) break case WA_MESSAGE_STUB_TYPE.GROUP_CHANGE_ANNOUNCE: diff --git a/src/WAConnection/5.User.ts b/src/WAConnection/5.User.ts index 2969bc0..adae63c 100644 --- a/src/WAConnection/5.User.ts +++ b/src/WAConnection/5.User.ts @@ -7,6 +7,7 @@ import { WAFlag, } from '../WAConnection/Constants' import { generateProfilePicture, waChatUniqueKey, whatsappID, unixTimestampSeconds } from './Utils' +import { Mutex } from './Mutex' // All user related functions -- get profile picture, set status etc. @@ -108,6 +109,12 @@ export class WAConnection extends Base { const cursor = (chats[chats.length-1] && chats.length >= count) ? waChatUniqueKey (chats[chats.length-1]) : null return { chats, cursor } } + /** + * Update the profile picture + * @param jid + * @param img + */ + @Mutex (jid => jid) async updateProfilePicture (jid: string, img: Buffer) { jid = whatsappID (jid) const data = await generateProfilePicture (img) @@ -133,6 +140,7 @@ export class WAConnection extends Base { * @param jid the ID of the person/group you are modifiying * @param durationMs only for muting, how long to mute the chat for */ + @Mutex ((jid, type) => jid+type) async modifyChat (jid: string, type: ChatModification, durationMs?: number) { jid = whatsappID (jid) const chat = this.assertChatGet (jid) diff --git a/src/WAConnection/6.MessagesSend.ts b/src/WAConnection/6.MessagesSend.ts index 51ac4d0..c9f0842 100644 --- a/src/WAConnection/6.MessagesSend.ts +++ b/src/WAConnection/6.MessagesSend.ts @@ -12,6 +12,7 @@ import { WAMessageContent, WAMetric, WAFlag, WAMessage, BaileysError, MessageLogLevel, WA_MESSAGE_STATUS_TYPE, WAMessageProto, MediaConnInfo } from './Constants' import { generateMessageID, sha256, hmacSign, aesEncrypWithIV, randomBytes, generateThumbnail, getMediaKeys, decodeMediaMessageBuffer, extensionForMediaMessage, whatsappID, unixTimestampSeconds } from './Utils' +import { Mutex } from './Mutex' export class WAConnection extends Base { /** @@ -194,6 +195,7 @@ export class WAConnection extends Base { * You may need to call this when the message is old & the content is deleted off of the WA servers * @param message */ + @Mutex (message => message?.key?.id) async updateMediaMessage (message: WAMessage) { const content = message.message?.audioMessage || message.message?.videoMessage || message.message?.imageMessage || message.message?.stickerMessage || message.message?.documentMessage if (!content) throw new BaileysError (`given message ${message.key.id} is not a media message`, message) @@ -206,6 +208,7 @@ export class WAConnection extends Base { * Securely downloads the media from the message. * Renews the download url automatically, if necessary. */ + @Mutex (message => message?.key?.id) async downloadMediaMessage (message: WAMessage) { try { const buff = await decodeMediaMessageBuffer (message.message, this.fetchRequest) diff --git a/src/WAConnection/7.MessagesExtra.ts b/src/WAConnection/7.MessagesExtra.ts index 1f528f4..d88d9d6 100644 --- a/src/WAConnection/7.MessagesExtra.ts +++ b/src/WAConnection/7.MessagesExtra.ts @@ -8,9 +8,11 @@ import { WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, BaileysError, MessageLogLevel, WA_MESSAGE_STATUS_TYPE } from './Constants' import { whatsappID, delay, toNumber, unixTimestampSeconds } from './Utils' +import { Mutex } from './Mutex' export class WAConnection extends Base { + @Mutex () async loadAllUnreadMessages () { const tasks = this.chats.all() .filter(chat => chat.count > 0) @@ -20,8 +22,8 @@ export class WAConnection extends Base { list.forEach (({messages}) => combined.push(...messages)) return combined } - /** Get the message info, who has read it, who its been delivered to */ + @Mutex ((jid, messageID) => jid+messageID) async messageInfo (jid: string, messageID: string) { const query = ['query', {type: 'message_info', index: messageID, jid: jid, epoch: this.msgCount.toString()}, null] const response = (await this.query ({json: query, binaryTags: [22, WAFlag.ignore], expect200: true}))[2] @@ -45,6 +47,7 @@ export class WAConnection extends Base { * @param jid the ID of the person/group whose message you want to mark read * @param unread unreads the chat, if true */ + @Mutex (jid => jid) async chatRead (jid: string, type: 'unread' | 'read' = 'read') { jid = whatsappID (jid) const chat = this.assertChatGet (jid) @@ -99,6 +102,7 @@ export class WAConnection extends Base { * @param before the data for which message to offset the query by * @param mostRecentFirst retreive the most recent message first or retreive from the converation start */ + @Mutex ((jid, _, before, mostRecentFirst) => jid + (before?.id || '') + mostRecentFirst) async loadMessages ( jid: string, count: number, @@ -271,6 +275,7 @@ export class WAConnection extends Base { * Delete a message in a chat for yourself * @param messageKey key of the message you want to delete */ + @Mutex (m => m.remoteJid) async clearMessage (messageKey: WAMessageKey) { const tag = Math.round(Math.random ()*1000000) const attrs: WANode = [ @@ -280,7 +285,14 @@ export class WAConnection extends Base { ['item', {owner: `${messageKey.fromMe}`, index: messageKey.id}, null] ] ] - return this.setQuery ([attrs]) + const result = await this.setQuery ([attrs]) + + const chat = this.chats.get (whatsappID(messageKey.remoteJid)) + if (chat) { + chat.messages = chat.messages.filter (m => m.key.id !== messageKey.id) + } + + return result } /** * Delete a message in a chat for everyone diff --git a/src/WAConnection/Mutex.ts b/src/WAConnection/Mutex.ts new file mode 100644 index 0000000..68c44a8 --- /dev/null +++ b/src/WAConnection/Mutex.ts @@ -0,0 +1,24 @@ +/** + * A simple mutex that can be used as a decorator. For examples, see Tests.Mutex.ts + * @param keyGetter if you want to lock functions based on certain arguments, specify the key for the function based on the arguments + */ +export function Mutex (keyGetter?: (...args: any[]) => string) { + let tasks: { [k: string]: Promise } = {} + return function (_, __, descriptor: PropertyDescriptor) { + const originalMethod = descriptor.value + descriptor.value = function (this: Object, ...args) { + const key = (keyGetter && keyGetter.call(this, ...args)) || 'undefined' + + tasks[key] = (async () => { + try { + tasks[key] && await tasks[key] + } catch { + + } + const result = await originalMethod.call(this, ...args) + return result + })() + return tasks[key] + } + } +} \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index bbfeb76..6ccb298 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,6 +2,7 @@ "compilerOptions": { "target": "es2018", "module": "commonjs", + "experimentalDecorators": true, "allowJs": true, "checkJs": false, "outDir": "lib", diff --git a/yarn.lock b/yarn.lock index 984754a..d197cb4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,7 +2,7 @@ # yarn lockfile v1 -"@adiwajshing/keyed-db@^0.1.2": +"@adiwajshing/keyed-db@^0.1.4": version "0.1.4" resolved "https://registry.yarnpkg.com/@adiwajshing/keyed-db/-/keyed-db-0.1.4.tgz#ed82563d8738cf2877a3c7c53a8739aa52f5efbf" integrity sha512-Sed2xppK0b1Ayfwy+ONl4GKTUahzIRmaxHULzraPbIiRx9QHOQ3PJon9ZT1qtkebGdSRX9uHvwruz2VU03/sCQ==