handle tags via EventEmitter + replace 'intermediate-close' with 'ws-close'

This commit is contained in:
Adhiraj Singh
2020-10-30 17:42:45 +05:30
parent 9366c45842
commit 69da12c33c
9 changed files with 101 additions and 102 deletions

View File

@@ -183,8 +183,8 @@ on (event: 'open', listener: (result: WAOpenResult) => void): this
on (event: 'connecting', listener: () => void): this
/** when the connection has closed */
on (event: 'close', listener: (err: {reason?: DisconnectReason | string, isReconnecting: boolean}) => void): this
/** when the connection has closed */
on (event: 'intermediate-close', listener: (err: {reason?: DisconnectReason | string}) => void): this
/** when the socket has closed */
on (event: 'ws-close', listener: (err: {reason?: DisconnectReason | string}) => void): this
/** when WA updates the credentials */
on (event: 'credentials-updated', listener: (auth: AuthenticationCredentials) => void): this
/** when a new QR is generated, ready for scanning */

View File

@@ -5,6 +5,31 @@ import {promises as fs} from 'fs'
require ('dotenv').config () // dotenv to load test jid
export const testJid = process.env.TEST_JID || '1234@s.whatsapp.net' // set TEST_JID=xyz@s.whatsapp.net in a .env file in the root directory
export const makeConnection = () => {
const conn = new WAConnection()
conn.connectOptions.maxIdleTimeMs = 30_000
conn.logger.level = 'debug'
let evCounts = {}
conn.on ('close', ({ isReconnecting }) => {
!isReconnecting && console.log ('Events registered: ', evCounts)
})
const onM = conn.on
conn.on = (...args: any[]) => {
evCounts[args[0]] = (evCounts[args[0]] || 0) + 1
return onM.apply (conn, args)
}
const offM = conn.off
conn.off = (...args: any[]) => {
evCounts[args[0]] = (evCounts[args[0]] || 0) - 1
if (evCounts[args[0]] <= 0) delete evCounts[args[0]]
return offM.apply (conn, args)
}
return conn
}
export async function sendAndRetreiveMessage(conn: WAConnection, content, type: MessageType, options: MessageOptions = {}) {
const response = await conn.sendMessage(testJid, content, type, options)
const {messages} = await conn.loadMessages(testJid, 10)

View File

@@ -2,11 +2,11 @@ import * as assert from 'assert'
import {WAConnection} from '../WAConnection/WAConnection'
import { AuthenticationCredentialsBase64, BaileysError, ReconnectMode, DisconnectReason } from '../WAConnection/Constants'
import { delay } from '../WAConnection/Utils'
import { assertChatDBIntegrity, testJid } from './Common'
import { assertChatDBIntegrity, makeConnection, testJid } from './Common'
describe('QR Generation', () => {
it('should generate QR', async () => {
const conn = new WAConnection()
const conn = makeConnection ()
conn.connectOptions.regenerateQRIntervalMs = 5000
let calledQR = 0
@@ -32,7 +32,7 @@ describe('Test Connect', () => {
it('should connect', async () => {
console.log('please be ready to scan with your phone')
const conn = new WAConnection()
const conn = makeConnection ()
let credentialsUpdateCalled = false
conn.on ('credentials-updated', () => credentialsUpdateCalled = true)
@@ -48,8 +48,8 @@ describe('Test Connect', () => {
conn.close()
auth = conn.base64EncodedAuthInfo()
})
it('should reconnect', async () => {
const conn = new WAConnection()
it('should restore session', async () => {
const conn = makeConnection ()
let credentialsUpdateCalled = false
conn.on ('credentials-updated', () => credentialsUpdateCalled = true)
@@ -72,7 +72,7 @@ describe('Test Connect', () => {
conn.close()
})
it ('should disconnect & reconnect phone', async () => {
const conn = new WAConnection ()
const conn = makeConnection ()
conn.logger.level = 'debug'
await conn.loadAuthInfo('./auth_info.json').connect ()
assert.equal (conn.phoneConnected, true)
@@ -132,14 +132,14 @@ describe ('Reconnects', () => {
if (failed) assert.fail ('should not have closed again')
}
it('should dispose correctly on bad_session', async () => {
const conn = new WAConnection()
const conn = makeConnection ()
conn.autoReconnect = ReconnectMode.onAllErrors
conn.loadAuthInfo ('./auth_info.json')
let gotClose0 = false
let gotClose1 = false
conn.on ('intermediate-close', ({ reason }) => {
conn.on ('ws-close', ({ reason }) => {
gotClose0 = true
})
conn.on ('close', ({ reason }) => {
@@ -164,7 +164,7 @@ describe ('Reconnects', () => {
* and see if the library cleans up resources correctly
*/
it('should cleanup correctly', async () => {
const conn = new WAConnection()
const conn = makeConnection ()
conn.autoReconnect = ReconnectMode.onAllErrors
conn.loadAuthInfo ('./auth_info.json')
@@ -188,7 +188,7 @@ describe ('Reconnects', () => {
* and see if the library cleans up resources correctly
*/
it('should disrupt connect loop', async () => {
const conn = new WAConnection()
const conn = makeConnection ()
conn.autoReconnect = ReconnectMode.onAllErrors
conn.loadAuthInfo ('./auth_info.json')
@@ -216,7 +216,7 @@ describe ('Reconnects', () => {
await verifyConnectionOpen (conn)
})
it ('should reconnect on broken connection', async () => {
const conn = new WAConnection ()
const conn = makeConnection ()
conn.autoReconnect = ReconnectMode.onConnectionLost
await conn.loadAuthInfo('./auth_info.json').connect ()
@@ -268,7 +268,7 @@ describe ('Reconnects', () => {
}
})
it ('should reconnect & stay connected', async () => {
const conn = new WAConnection ()
const conn = makeConnection ()
conn.autoReconnect = ReconnectMode.onConnectionLost
await conn.loadAuthInfo('./auth_info.json').connect ()
@@ -289,7 +289,7 @@ describe ('Reconnects', () => {
describe ('Pending Requests', () => {
it ('should correctly send updates', async () => {
const conn = new WAConnection ()
const conn = makeConnection ()
conn.pendingRequestTimeoutMs = null
conn.loadAuthInfo('./auth_info.json')
@@ -323,7 +323,7 @@ describe ('Pending Requests', () => {
conn.close ()
})
it('should queue requests when closed', async () => {
const conn = new WAConnection ()
const conn = makeConnection ()
//conn.pendingRequestTimeoutMs = null
await conn.loadAuthInfo('./auth_info.json').connect ()

View File

@@ -27,7 +27,6 @@ import { EventEmitter } from 'events'
import KeyedDB from '@adiwajshing/keyed-db'
import { STATUS_CODES, Agent } from 'http'
import pino from 'pino'
import { rejects } from 'assert'
const logger = pino({ prettyPrint: { levelFirst: true, ignore: 'hostname', translateTime: true }, prettifier: require('pino-pretty') })
@@ -79,7 +78,6 @@ export class WAConnection extends EventEmitter {
protected conn: WS = null
protected msgCount = 0
protected keepAliveReq: NodeJS.Timeout
protected callbacks: {[k: string]: any} = {}
protected encoder = new Encoder()
protected decoder = new Decoder()
protected phoneCheckInterval = undefined
@@ -93,7 +91,6 @@ export class WAConnection extends EventEmitter {
protected mediaConn: MediaConnInfo
protected debounceTimeout: NodeJS.Timeout
protected rejectPendingConnection = (e: Error) => {}
constructor () {
super ()
@@ -181,14 +178,22 @@ export class WAConnection extends EventEmitter {
if (!this.phoneCheckInterval && requiresPhoneConnection) {
this.startPhoneCheckInterval ()
}
let onRecv: (json) => void
let onErr: (err) => void
try {
const result = await Utils.promiseTimeout(timeoutMs,
(resolve, reject) => (this.callbacks[tag] = { queryJSON: json, callback: resolve, errCallback: reject }),
(resolve, reject) => {
onRecv = resolve
onErr = ({reason}) => reject(new Error(reason))
this.once (`TAG:${tag}`, onRecv)
this.once ('ws-close', onErr) // if the socket closes, you'll never receive the message
},
)
return result as any
} finally {
requiresPhoneConnection && this.clearPhoneCheckInterval ()
delete this.callbacks[tag]
this.off (`TAG:${tag}`, onRecv)
this.off (`ws-close`, onErr)
}
}
/** Generic function for action, set queries */
@@ -252,6 +257,7 @@ export class WAConnection extends EventEmitter {
this.phoneCheckInterval && clearInterval (this.phoneCheckInterval)
this.phoneCheckInterval = undefined
}
/** checks for phone connection */
protected async sendAdminTest () {
return this.sendJSON (['admin', 'test'])
}
@@ -282,7 +288,10 @@ export class WAConnection extends EventEmitter {
}
protected startDebouncedTimeout () {
this.stopDebouncedTimeout ()
this.debounceTimeout = setTimeout (() => this.rejectPendingConnection(TimedOutError()), this.connectOptions.maxIdleTimeMs)
this.debounceTimeout = setTimeout (
() => this.emit('ws-close', { reason: DisconnectReason.timedOut }),
this.connectOptions.maxIdleTimeMs
)
}
protected stopDebouncedTimeout () {
this.debounceTimeout && clearTimeout (this.debounceTimeout)
@@ -371,7 +380,8 @@ export class WAConnection extends EventEmitter {
this.keepAliveReq && clearInterval(this.keepAliveReq)
this.clearPhoneCheckInterval ()
this.rejectPendingConnection && this.rejectPendingConnection (new Error('close'))
this.emit ('ws-close', { reason: DisconnectReason.close })
//this.rejectPendingConnection && this.rejectPendingConnection (new Error('close'))
try {
this.conn?.close()
@@ -379,15 +389,9 @@ export class WAConnection extends EventEmitter {
} catch {
}
this.conn = null
this.lastSeen = null
this.conn = undefined
this.lastSeen = undefined
this.msgCount = 0
Object.keys(this.callbacks).forEach(key => {
this.logger.debug (`cancelling message wait: ${key}`)
this.callbacks[key].errCallback(new Error('close'))
delete this.callbacks[key]
})
}
/**
* Does a fetch request with the configuration of the connection

View File

@@ -215,9 +215,9 @@ export class WAConnection extends Base {
emitQR ()
regenQR ()
} catch (error) {
this.logger.error ({ error }, `error in QR gen`)
this.logger.warn ({ error }, `error in QR gen`)
if (error.status === 429) { // too many QR requests
this.rejectPendingConnection (error)
this.emit ('ws-close', { reason: error.message })
}
}
}, this.connectOptions.regenerateQRIntervalMs)

View File

@@ -6,6 +6,7 @@ import WS from 'ws'
import KeyedDB from '@adiwajshing/keyed-db'
const DEF_CALLBACK_PREFIX = 'CB:'
const DEF_TAG_PREFIX = 'TAG:'
export class WAConnection extends Base {
/** Connect to WhatsApp Web */
@@ -45,7 +46,6 @@ export class WAConnection extends Base {
this.closeInternal (reason)
}
if (!willReconnect) throw error
this.emit ('intermediate-close', {reason})
}
}
@@ -61,12 +61,12 @@ export class WAConnection extends Base {
}
/** Meat of the connect logic */
protected async connectInternal (options: WAConnectOptions, delayMs?: number) {
const rejections: ((e?: Error) => void)[] = []
const rejectAll = (e: Error) => rejections.forEach (r => r(e))
// actual connect
const connect = () => {
let cancel: () => void
const task = new Promise((resolve, reject) => {
let rejectSafe = error => reject (error)
cancel = () => reject (CancelledError())
const connect = () => (
new Promise((resolve, reject) => {
rejections.push (reject)
// determine whether reconnect should be used or not
const shouldUseReconnect = (this.lastDisconnectReason === DisconnectReason.close ||
this.lastDisconnectReason === DisconnectReason.lost) &&
@@ -93,69 +93,46 @@ export class WAConnection extends Base {
let waitForChats: Promise<{[k: string]: Partial<WAChat>}>
// add wait for chats promise if required
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const recv = this.receiveChatsAndContacts(this.connectOptions.waitOnlyForLastMessage)
waitForChats = recv.waitForChats
rejectSafe = e => {
recv.cancelChats ()
reject (e)
}
const {waitForChats: wPromise, cancelChats} = this.receiveChatsAndContacts(this.connectOptions.waitOnlyForLastMessage)
waitForChats = wPromise
rejections.push (cancelChats)
}
try {
this.rejectPendingConnection = rejectSafe
const [, result] = await Promise.all ([
this.authenticate(reconnectID)
.then(() => {
this.startKeepAliveRequest()
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
}),
waitForChats || Promise.resolve({})
])
const [, result] = await Promise.all (
[
this.authenticate(reconnectID)
.then(() => this.startKeepAliveRequest()),
waitForChats || undefined
]
)
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
this.stopDebouncedTimeout ()
resolve (result)
} catch (error) {
reject (error)
}
})
this.conn.on('error', rejectSafe)
this.conn.on('close', () => rejectSafe(new Error('close')))
this.conn.on('error', rejectAll)
this.conn.on('close', () => rejectAll(new Error(DisconnectReason.close)))
}) as Promise<void | { [k: string]: Partial<WAChat> }>
)
return { promise: task, cancel: cancel }
}
let promise = Promise.resolve ()
let cancellations: (() => void)[] = []
const cancel = () => cancellations.forEach (cancel => cancel())
this.on ('close', cancel)
if (delayMs) {
const {delay, cancel} = Utils.delayCancellable (delayMs)
promise = delay
cancellations.push (cancel)
}
this.on ('ws-close', rejectAll)
try {
await promise
const result = connect ()
cancellations.push (result.cancel)
const final = await result.promise
return final
if (delayMs) {
const {delay, cancel} = Utils.delayCancellable (delayMs)
rejections.push (cancel)
await delay
}
const result = await connect ()
return result
} catch (error) {
this.endConnection ()
throw error
} finally {
cancel()
this.rejectPendingConnection = null
this.off('close', cancel)
this.off ('ws-close', rejectAll)
}
}
/**
@@ -309,7 +286,7 @@ export class WAConnection extends Base {
this.logger.error ({ error }, `encountered error in decrypting message, closing: ${error}`)
if (this.state === 'open') this.unexpectedDisconnect (DisconnectReason.badSession)
else this.rejectPendingConnection (new Error(DisconnectReason.badSession))
else this.emit ('ws-close', new Error(DisconnectReason.badSession))
}
if (this.shouldLogMessages) this.messageLog.push ({ tag: messageTag, json: JSON.stringify(json), fromMe: false })
@@ -318,17 +295,11 @@ export class WAConnection extends Base {
if (this.logger.level === 'trace') {
this.logger.trace(messageTag + ', ' + JSON.stringify(json))
}
/* Check if this is a response to a message we sent */
if (this.callbacks[messageTag]) {
const q = this.callbacks[messageTag]
q.callback(json)
delete this.callbacks[messageTag]
return
}
/*
Check if this is a response to a message we are expecting
*/
let anyTriggered = false
/* Check if this is a response to a message we sent */
anyTriggered = this.emit (`${DEF_TAG_PREFIX}${messageTag}`, json)
/* Check if this is a response to a message we are expecting */
const l0 = json[0] || ''
const l1 = typeof json[1] !== 'object' || json[1] === null ? {} : json[1]
const l2 = ((json[2] || [])[0] || [])[0] || ''

View File

@@ -334,8 +334,8 @@ export class WAConnection extends Base {
on (event: 'connecting', listener: () => void): this
/** when the connection has closed */
on (event: 'close', listener: (err: {reason?: DisconnectReason | string, isReconnecting: boolean}) => void): this
/** when the connection has closed */
on (event: 'intermediate-close', listener: (err: {reason?: DisconnectReason | string}) => void): this
/** when the socket is closed */
on (event: 'ws-close', listener: (err: {reason?: DisconnectReason | string}) => void): this
/** when WA updates the credentials */
on (event: 'credentials-updated', listener: (auth: AuthenticationCredentials) => void): this
/** when a new QR is generated, ready for scanning */

View File

@@ -186,7 +186,6 @@ export class WAConnection extends Base {
this.emit ('chat-update', { jid, [type]: chat[type] })
}
}
return response
}
}

View File

@@ -417,7 +417,7 @@ export type BaileysEvent =
'open' |
'connecting' |
'close' |
'intermediate-close' |
'ws-close' |
'qr' |
'connection-phone-change' |
'user-presence-update' |