From 218c9bcc18b611d5b320fbbf994afc05f27ec6ce Mon Sep 17 00:00:00 2001 From: Adhiraj Date: Tue, 1 Sep 2020 12:13:25 +0530 Subject: [PATCH] Added function to change read status for chat, updated connect --- Example/example.ts | 2 +- README.md | 6 +- package.json | 2 +- src/Tests/Tests.Connect.ts | 146 +++++++++++++++++++++++----- src/Tests/Tests.Misc.ts | 8 +- src/WAConnection/0.Base.ts | 16 +-- src/WAConnection/3.Connect.ts | 82 +++++++--------- src/WAConnection/7.MessagesExtra.ts | 49 ++++++++-- src/WAConnection/Utils.ts | 22 ++--- 9 files changed, 222 insertions(+), 111 deletions(-) diff --git a/Example/example.ts b/Example/example.ts index 5a23f76..685ad7c 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -92,7 +92,7 @@ async function example() { } // send a reply after 3 seconds setTimeout(async () => { - await conn.sendReadReceipt(m.key.remoteJid, m.key.id) // send read receipt + await conn.chatRead(m.key.remoteJid) // mark chat read await conn.updatePresence(m.key.remoteJid, Presence.available) // tell them we're available await conn.updatePresence(m.key.remoteJid, Presence.composing) // tell them we're composing diff --git a/README.md b/README.md index 4a34cf7..929dd6c 100644 --- a/README.md +++ b/README.md @@ -224,10 +224,8 @@ await conn.forwardMessage ('455@s.whatsapp.net', message) // WA forward the mess const id = '1234-123@g.us' const messageID = 'AHASHH123123AHGA' // id of the message you want to read -await conn.sendReadReceipt (id) // mark all messages in chat as read -await conn.sendReadReceipt(id, messageID, 1) // mark the mentioned message as read - -await conn.sendReadReceipt(id, null, -2) // mark the chat as unread +await conn.chatRead (id) // mark all messages in chat as read (equivalent of opening a chat in WA) +await conn.chatRead (id, 'unread') // mark the chat as unread ``` The message ID is the unique identifier of the message that you are marking as read. On a `WAMessage`, the `messageID` can be accessed using ```messageID = message.key.id```. diff --git a/package.json b/package.json index ca7cbff..e3b1f99 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "automation" ], "scripts": { - "test": "mocha --timeout 60000 -r ts-node/register src/Tests/Tests.*.ts", + "test": "mocha --timeout 240000 -r ts-node/register src/Tests/Tests.*.ts", "prepare": "npm run build:tsc", "lint": "eslint '*/*.ts' --quiet --fix", "build:tsc": "tsc", diff --git a/src/Tests/Tests.Connect.ts b/src/Tests/Tests.Connect.ts index 595b53a..a18d011 100644 --- a/src/Tests/Tests.Connect.ts +++ b/src/Tests/Tests.Connect.ts @@ -1,6 +1,6 @@ import * as assert from 'assert' import {WAConnection} from '../WAConnection/WAConnection' -import { AuthenticationCredentialsBase64, BaileysError, ReconnectMode } from '../WAConnection/Constants' +import { AuthenticationCredentialsBase64, BaileysError, ReconnectMode, DisconnectReason } from '../WAConnection/Constants' import { delay } from '../WAConnection/Utils' describe('QR Generation', () => { @@ -40,31 +40,6 @@ describe('Test Connect', () => { conn.close() auth = conn.base64EncodedAuthInfo() }) - /** - * the idea is to test closing the connection at multiple points in the connection - * and see if the library cleans up resources correctly - */ - it('should cleanup correctly', async () => { - const conn = new WAConnection() - conn.loadAuthInfo ('./auth_info.json') - - let timeout = 0.1 - while (true) { - let tmout = setTimeout (() => conn.close(), timeout*1000) - try { - await conn.connect () - - clearTimeout (tmout) - conn.close () - - break - } catch (error) { - - } - // exponentially increase the timeout disconnect - timeout *= 2 - } - }) it('should reconnect', async () => { const conn = new WAConnection() conn.connectOptions.timeoutMs = 20*1000 @@ -98,6 +73,14 @@ describe('Test Connect', () => { }) }) describe ('Reconnects', () => { + const verifyConnectionOpen = async (conn: WAConnection) => { + // check that the connection stays open + conn.on ('close', ({reason}) => ( + reason !== DisconnectReason.intentional && assert.fail ('should not have closed again') + )) + await delay (60*1000) + conn.close () + } it ('should disconnect & reconnect phone', async () => { const conn = new WAConnection () await conn.loadAuthInfo('./auth_info.json').connect () @@ -120,6 +103,68 @@ describe ('Reconnects', () => { conn.close () } }) + /** + * the idea is to test closing the connection at multiple points in the connection + * and see if the library cleans up resources correctly + */ + it('should cleanup correctly', async () => { + const conn = new WAConnection() + conn.autoReconnect = ReconnectMode.onAllErrors + conn.loadAuthInfo ('./auth_info.json') + + let timeout = 0.1 + while (true) { + let tmout = setTimeout (() => conn.close(), timeout*1000) + try { + await conn.connect () + clearTimeout (tmout) + break + } catch (error) { + + } + // exponentially increase the timeout disconnect + timeout *= 2 + } + conn.on ('close', ({reason}) => ( + // with v fast successive connections, WA sometimes incorrectly classifies a connection as taken over + (reason !== DisconnectReason.intentional && reason !== DisconnectReason.replaced) && + assert.fail ('should not have closed again') + )) + await delay (90*1000) + conn.close () + }) + /** + * the idea is to test closing the connection at multiple points in the connection + * and see if the library cleans up resources correctly + */ + it('should cleanup correctly 2', async () => { + const conn = new WAConnection() + conn.autoReconnect = ReconnectMode.onAllErrors + conn.connectOptions.timeoutMs = 20000 + conn.loadAuthInfo ('./auth_info.json') + + let timeout = 1000 + let tmout + const endConnection = async () => { + while (!conn['conn']) { + await delay(100) + } + conn['conn'].terminate () + + while (conn['conn']) { + await delay(100) + } + + timeout *= 2 + tmout = setTimeout (endConnection, timeout) + } + tmout = setTimeout (endConnection, timeout) + + await conn.connect () + clearTimeout (tmout) + + await verifyConnectionOpen (conn) + }) it ('should reconnect on broken connection', async () => { const conn = new WAConnection () conn.autoReconnect = ReconnectMode.onConnectionLost @@ -172,7 +217,56 @@ describe ('Reconnects', () => { conn.close () } }) + it ('should disrupt connect loop', async () => { + const conn = new WAConnection () + conn.loadAuthInfo ('./auth_info.json') + conn.connectOptions.maxRetries = 20 + conn.connectOptions.timeoutMs = 20*1000 + + delay (3000) + .then (() => conn.close()) + + await assert.rejects( conn.connect () ) + + console.log ('rejected correctly') + + delay (3000) + .then (() => conn['conn'].terminate()) + .then (async () => { + while (conn['conn']) { + await delay(100) + } + console.log ('destroyed WS') + }) + .then (() => delay(5000)) + .then (() => conn['conn'].terminate()) + + await conn.connect () + + console.log ('opened connection') + + await verifyConnectionOpen (conn) + }) + it ('should reconnect & stay connected', async () => { + const conn = new WAConnection () + conn.autoReconnect = ReconnectMode.onConnectionLost + + await conn.loadAuthInfo('./auth_info.json').connect () + assert.equal (conn.phoneConnected, true) + + await delay (30*1000) + + conn['conn']?.terminate () + + conn.on ('close', () => { + assert.ok (!conn['lastSeen']) + console.log ('connection closed') + }) + await new Promise (resolve => conn.on ('open', resolve)) + await verifyConnectionOpen (conn) + }) }) + describe ('Pending Requests', () => { it('should queue requests when closed', async () => { const conn = new WAConnection () diff --git a/src/Tests/Tests.Misc.ts b/src/Tests/Tests.Misc.ts index 12b3296..e7c3c88 100644 --- a/src/Tests/Tests.Misc.ts +++ b/src/Tests/Tests.Misc.ts @@ -70,7 +70,7 @@ WAConnectionTest('Misc', (conn) => { const response = await conn.updatePresence(testJid, Presence.composing) assert.ok(response) }) - it('should mark a chat unread', async () => { + it('should change a chat read status', async () => { const waitForEvent = new Promise (resolve => { conn.on ('chat-update', ({jid, count}) => { if (jid === testJid) { @@ -80,8 +80,12 @@ WAConnectionTest('Misc', (conn) => { } }) }) - await conn.sendReadReceipt(testJid, null, -2) + await conn.chatRead (testJid, 'unread') await waitForEvent + + await delay (5000) + + await conn.chatRead (testJid, 'read') }) it('should archive & unarchive', async () => { await conn.modifyChat (testJid, ChatModification.archive) diff --git a/src/WAConnection/0.Base.ts b/src/WAConnection/0.Base.ts index 6997654..a64e8f0 100644 --- a/src/WAConnection/0.Base.ts +++ b/src/WAConnection/0.Base.ts @@ -309,23 +309,27 @@ export class WAConnection extends EventEmitter { this.state = 'close' this.msgCount = 0 - this.conn?.removeAllListeners ('close') - this.conn?.close() - this.conn = null this.phoneConnected = false this.lastDisconnectReason = reason - this.lastSeen = null + + + this.endConnection () if (reason === 'invalid_session' || reason === 'intentional') { this.pendingRequests.forEach (({reject}) => reject(new Error('close'))) this.pendingRequests = [] } - this.removePendingCallbacks () + // reconnecting if the timeout is active for the reconnect loop this.emit ('close', { reason, isReconnecting }) } - protected removePendingCallbacks () { + protected endConnection () { + this.conn?.removeAllListeners ('close') + this.conn?.close() + this.conn = null + this.lastSeen = null + Object.keys(this.callbacks).forEach(key => { if (!key.includes('function:')) { this.log (`cancelling message wait: ${key}`, MessageLogLevel.info) diff --git a/src/WAConnection/3.Connect.ts b/src/WAConnection/3.Connect.ts index 8eab8a6..dff2e3a 100644 --- a/src/WAConnection/3.Connect.ts +++ b/src/WAConnection/3.Connect.ts @@ -1,5 +1,5 @@ import * as Utils from './Utils' -import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError } from './Constants' +import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError } from './Constants' import {WAConnection as Base} from './1.Validation' import Decoder from '../Binary/Decoder' @@ -53,18 +53,28 @@ export class WAConnection extends Base { protected async connectInternal (options: WAConnectOptions, delayMs?: number) { // actual connect const connect = () => { - const tasks: Promise[] = [] const timeoutMs = options?.timeoutMs || 60*1000 const { ws, cancel } = Utils.openWebSocketConnection (5000, false) + let task = ws + .then (conn => this.conn = conn) + .then (() => this.conn.on('message', data => this.onMessageRecieved(data as any))) + .then (() => ( + this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info) + )) + .then (() => this.authenticate(reconnectID)) + .then (() => { + this.conn + .removeAllListeners ('error') + .removeAllListeners('close') + }) + let cancelTask: () => void if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) { + const {waitForChats, cancelChats} = this.receiveChatsAndContacts(true) - const {waitForChats, cancelChats} = this.receiveChatsAndContacts(timeoutMs, true) - tasks.push (waitForChats) - - cancellations.push (cancelChats) + task = Promise.all ([task, waitForChats]).then (() => {}) cancelTask = () => { cancelChats(); cancel() } } else cancelTask = cancel @@ -74,33 +84,15 @@ export class WAConnection extends Base { this.lastDisconnectReason !== DisconnectReason.intentional && this.user?.jid const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null - const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => { - ws - .then (conn => this.conn = conn) - .then (() => this.conn.on('message', data => this.onMessageRecieved(data as any))) - .then (() => ( - this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info) - )) - .then (() => this.authenticate(reconnectID)) - .then (() => ( - this.conn - .removeAllListeners ('error') - .removeAllListeners('close') - )) - .then (resolve) - .catch (reject) - }) + const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => ( + task.then (resolve).catch (reject) + )) .catch (err => { - this.removePendingCallbacks () + this.endConnection () throw err }) as Promise - tasks.push (promise) - - return { - promise: Promise.all (tasks), - cancel: cancelTask - } + return { promise, cancel: cancelTask } } let promise = Promise.resolve () @@ -120,7 +112,6 @@ export class WAConnection extends Base { .then (() => { const {promise, cancel} = connect () cancellations.push (cancel) - return promise }) .finally (() => { @@ -133,7 +124,7 @@ export class WAConnection extends Base { * Must be called immediately after connect * @returns [chats, contacts] */ - protected receiveChatsAndContacts(timeoutMs: number = null, stopAfterMostRecentMessage: boolean=false) { + protected receiveChatsAndContacts(stopAfterMostRecentMessage: boolean=false) { this.contacts = {} this.chats.clear () @@ -222,23 +213,22 @@ export class WAConnection extends Base { }) // wait for the chats & contacts to load - const {delay, cancel} = Utils.delayCancellable (timeoutMs) - - const waitForChats = Promise.race ([ - new Promise (resolve => resolveTask = resolve), - delay.then (() => { throw TimedOutError() }) - ]) - .then (() => ( - this.chats - .all () - .forEach (chat => { - const respectiveContact = this.contacts[chat.jid] - chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name + let cancelChats: () => void + const waitForChats = new Promise ((resolve, reject) => { + resolveTask = resolve + cancelChats = () => reject (CancelledError()) }) - )) - .finally (deregisterCallbacks) + .then (() => ( + this.chats + .all () + .forEach (chat => { + const respectiveContact = this.contacts[chat.jid] + chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name + }) + )) + .finally (deregisterCallbacks) - return { waitForChats, cancelChats: cancel } + return { waitForChats, cancelChats } } private releasePendingRequests () { this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request diff --git a/src/WAConnection/7.MessagesExtra.ts b/src/WAConnection/7.MessagesExtra.ts index bb20a2e..733117e 100644 --- a/src/WAConnection/7.MessagesExtra.ts +++ b/src/WAConnection/7.MessagesExtra.ts @@ -41,16 +41,49 @@ export class WAConnection extends Base { return info } /** - * Read/unread messages of a chat; will mark the entire chat read by default + * Marks a chat as read/unread; updates the chat object too + * @param jid the ID of the person/group whose message you want to mark read + * @param unread unreads the chat, if true + */ + async chatRead (jid: string, type: 'unread' | 'read' = 'read') { + jid = whatsappID (jid) + const chat = this.assertChatGet (jid) + + if (type === 'unread') await this.sendReadReceipt (jid, null, -2) + else if (chat.count !== 0) { + let messageID: string + + let messages: WAMessage[] + let cursor: any + + messages = chat.messages + cursor = messages[messages.length-1]?.key + + do { + const m = messages.reverse().find (m => !m.key.fromMe) + if (m) messageID = m.key.id + + const obj = await this.loadMessages (jid, 10, cursor) + messages = obj.messages + cursor = obj.cursor + + if (messages.length === 0) throw new BaileysError ('no valid message found to read', { status: 404 }) + } while (!messageID) + + await this.sendReadReceipt (jid, messageID, Math.abs(chat.count)) + } + + chat.count = type === 'unread' ? -1 : 0 + this.emit ('chat-update', {jid, count: chat.count}) + } + /** + * Sends a read receipt for a given message; + * does not update the chat do @see chatRead * @param jid the ID of the person/group whose message you want to mark read * @param messageID optionally, the message ID * @param count number of messages to read, set to < 0 to unread a message */ - async sendReadReceipt(jid: string, messageID?: string, count?: number) { - jid = whatsappID (jid) - const chat = this.chats.get(jid) - count = count || Math.abs(chat?.count || 1) - + async sendReadReceipt(jid: string, messageID: string, count: number) { const attributes = { jid: jid, count: count.toString(), @@ -58,10 +91,6 @@ export class WAConnection extends Base { owner: messageID ? 'false' : null } const read = await this.setQuery ([['read', attributes, null]]) - if (chat) { - chat.count = count > 0 ? Math.max(chat.count-count, 0) : -1 - this.emit ('chat-update', {jid, count: chat.count}) - } return read } /** diff --git a/src/WAConnection/Utils.ts b/src/WAConnection/Utils.ts index b77c81a..1500df3 100644 --- a/src/WAConnection/Utils.ts +++ b/src/WAConnection/Utils.ts @@ -88,24 +88,16 @@ export async function promiseTimeout(ms: number, promise: (resolve: (v?: T)=> if (!ms) return new Promise (promise) // Create a promise that rejects in milliseconds - const {delay, cancel} = delayCancellable (ms) - - let pReject: (error) => void + let {delay, cancel} = delayCancellable (ms) const p = new Promise ((resolve, reject) => { + delay + .then(() => reject(TimedOutError())) + .catch (err => reject(err)) + promise (resolve, reject) - pReject = reject }) - - try { - const content = await Promise.race([ - p, - delay.then(() => pReject(TimedOutError())) - ]) - cancel () - return content as T - } finally { - cancel () - } + .finally (cancel) + return p as Promise } export const openWebSocketConnection = (timeoutMs: number, retryOnNetworkError: boolean) => {