Callbacks via EventEmitter + possible memory leak fix

This commit is contained in:
Adhiraj Singh
2020-10-28 16:10:28 +05:30
parent 159b6d5cf5
commit cc722a5138
8 changed files with 106 additions and 161 deletions

View File

@@ -123,7 +123,7 @@ async function example() {
})
/* example of custom functionality for tracking battery */
conn.registerCallback(['action', null, 'battery'], json => {
conn.on('CB:action,,battery', json => {
const batteryLevelStr = json[2][0][1].value
const batterylevel = parseInt(batteryLevelStr)
console.log('battery level: ' + batterylevel)

View File

@@ -244,7 +244,7 @@ const options: MessageOptions = {mimetype: Mimetype.gif, caption: "hello!"} // s
conn.sendMessage(id, buffer, MessageType.video, options)
// send an audio file
const buffer = fs.readFileSync("Media/audio.mp3") // can send mp3, mp4, & ogg -- but for mp3 files the mimetype must be set to ogg
const options: MessageOptions = {mimetype: Mimetype.ogg} // some metadata (can't have caption in audio)
const options: MessageOptions = {mimetype: Mimetype.mp4Audio} // some metadata (can't have caption in audio)
conn.sendMessage(id, buffer, MessageType.audio, options)
```
@@ -496,7 +496,7 @@ This will enable you to see all sorts of messages WhatsApp sends in the console.
Hence, you can register a callback for an event using the following:
``` ts
conn.registerCallback (["action", null, "battery"], json => {
conn.on (["action", null, "battery"], json => {
const batteryLevelStr = json[2][0][1].value
const batterylevel = parseInt (batteryLevelStr)
console.log ("battery level: " + batterylevel + "%")
@@ -516,7 +516,7 @@ This will enable you to see all sorts of messages WhatsApp sends in the console.
Following this, one can implement the following callback:
``` ts
conn.registerCallback (["Conn", "pushname"], json => {
conn.on ('CB:Conn,pushname', json => {
const pushname = json[1].pushname
conn.user.name = pushname // update on client too
console.log ("Name updated: " + pushname)

View File

@@ -22,7 +22,7 @@
"build:all": "tsc && typedoc",
"build:docs": "typedoc",
"build:tsc": "tsc",
"example": "npx ts-node Example/example.ts",
"example": "node --inspect -r ts-node/register Example/example.ts",
"gen-protobuf": "ts-node src/Binary/GenerateStatics.ts",
"browser-decode": "npx ts-node src/BrowserMessageDecoding.ts"
},

View File

@@ -16,17 +16,18 @@ import {
WAConnectionState,
AnyAuthenticationCredentials,
WAContact,
WAChat,
WAQuery,
ReconnectMode,
WAConnectOptions,
MediaConnInfo,
DEFAULT_ORIGIN,
TimedOutError,
} from './Constants'
import { EventEmitter } from 'events'
import KeyedDB from '@adiwajshing/keyed-db'
import { STATUS_CODES, Agent } from 'http'
import pino from 'pino'
import { rejects } from 'assert'
const logger = pino({ prettyPrint: { levelFirst: true, ignore: 'hostname', translateTime: true }, prettifier: require('pino-pretty') })
@@ -81,7 +82,6 @@ export class WAConnection extends EventEmitter {
protected callbacks: {[k: string]: any} = {}
protected encoder = new Encoder()
protected decoder = new Decoder()
protected pendingRequests: {resolve: () => void, reject: (error) => void}[] = []
protected phoneCheckInterval = undefined
protected referenceDate = new Date () // used for generating tags
@@ -93,11 +93,11 @@ export class WAConnection extends EventEmitter {
protected mediaConn: MediaConnInfo
protected debounceTimeout: NodeJS.Timeout
protected onDebounceTimeout = () => {}
protected rejectPendingConnection = (e: Error) => {}
constructor () {
super ()
this.registerCallback (['Cmd', 'type:disconnect'], json => (
this.on ('CB:Cmd,type:disconnect', json => (
this.state === 'open' && this.unexpectedDisconnect(json[1].kind || 'unknown')
))
}
@@ -171,48 +171,6 @@ export class WAConnection extends EventEmitter {
}
return this
}
/**
* Register for a callback for a certain function
* @param parameters name of the function along with some optional specific parameters
*/
registerCallback(parameters: [string, string?, string?] | string, callback) {
if (typeof parameters === 'string') {
return this.registerCallback([parameters, null, null], callback)
}
if (!Array.isArray(parameters)) {
throw new Error('parameters (' + parameters + ') must be a string or array')
}
const func = 'function:' + parameters[0]
const key = parameters[1] || ''
const key2 = parameters[2] || ''
if (!this.callbacks[func]) {
this.callbacks[func] = {}
}
if (!this.callbacks[func][key]) {
this.callbacks[func][key] = {}
}
this.callbacks[func][key][key2] = callback
}
/**
* Cancel all further callback events associated with the given parameters
* @param parameters name of the function along with some optional specific parameters
*/
deregisterCallback(parameters: [string, string?, string?] | string) {
if (typeof parameters === 'string') {
return this.deregisterCallback([parameters])
}
if (!Array.isArray(parameters)) {
throw new Error('parameters (' + parameters + ') must be a string or array')
}
const func = 'function:' + parameters[0]
const key = parameters[1] || ''
const key2 = parameters[2] || ''
if (this.callbacks[func] && this.callbacks[func][key] && this.callbacks[func][key][key2]) {
delete this.callbacks[func][key][key2]
return
}
this.logger.warn('Could not find ' + JSON.stringify(parameters) + ' to deregister')
}
/**
* Wait for a message with a certain tag to be received
* @param tag the message tag to await
@@ -324,7 +282,7 @@ export class WAConnection extends EventEmitter {
}
protected startDebouncedTimeout () {
this.stopDebouncedTimeout ()
this.debounceTimeout = setTimeout (() => this.onDebounceTimeout(), this.connectOptions.maxIdleTimeMs)
this.debounceTimeout = setTimeout (() => this.rejectPendingConnection(TimedOutError()), this.connectOptions.maxIdleTimeMs)
}
protected stopDebouncedTimeout () {
this.debounceTimeout && clearTimeout (this.debounceTimeout)
@@ -352,7 +310,20 @@ export class WAConnection extends EventEmitter {
await Utils.promiseTimeout (
this.pendingRequestTimeoutMs,
(resolve, reject) => this.pendingRequests.push({resolve, reject})
(resolve, reject) => {
const onClose = ({ reason }) => {
if (reason === DisconnectReason.invalidSession || reason === DisconnectReason.intentional) {
reject (new Error(reason))
}
this.off ('open', onOpen)
}
const onOpen = () => {
resolve ()
this.off ('close', onClose)
}
this.once ('close', onClose)
this.once ('open', onOpen)
}
)
}
/**
@@ -381,11 +352,6 @@ export class WAConnection extends EventEmitter {
this.lastDisconnectTime = new Date ()
this.endConnection ()
if (reason === 'invalid_session' || reason === 'intentional') {
this.pendingRequests.forEach (({reject}) => reject(new Error(reason)))
this.pendingRequests = []
}
// reconnecting if the timeout is active for the reconnect loop
this.emit ('close', { reason, isReconnecting })
}
@@ -400,6 +366,8 @@ export class WAConnection extends EventEmitter {
this.keepAliveReq && clearInterval(this.keepAliveReq)
this.clearPhoneCheckInterval ()
this.rejectPendingConnection && this.rejectPendingConnection (new Error('close'))
try {
this.conn?.close()
//this.conn?.terminate()
@@ -411,11 +379,9 @@ export class WAConnection extends EventEmitter {
this.msgCount = 0
Object.keys(this.callbacks).forEach(key => {
if (!key.startsWith('function:')) {
this.logger.debug (`cancelling message wait: ${key}`)
this.callbacks[key].errCallback(new Error('close'))
delete this.callbacks[key]
}
this.logger.debug (`cancelling message wait: ${key}`)
this.callbacks[key].errCallback(new Error('close'))
delete this.callbacks[key]
})
}
/**

View File

@@ -5,6 +5,8 @@ import Decoder from '../Binary/Decoder'
import WS from 'ws'
import KeyedDB from '@adiwajshing/keyed-db'
const DEF_CALLBACK_PREFIX = 'CB:'
export class WAConnection extends Base {
/** Connect to WhatsApp Web */
async connect () {
@@ -50,8 +52,6 @@ export class WAConnection extends Base {
const updatedChats = !!this.lastDisconnectTime && updates
const result: WAOpenResult = { user: this.user, newConnection, updatedChats }
this.emit ('open', result)
this.releasePendingRequests ()
this.logger.info ('opened connection to WhatsApp Web')
@@ -65,6 +65,7 @@ export class WAConnection extends Base {
const connect = () => {
let cancel: () => void
const task = new Promise((resolve, reject) => {
let rejectSafe = error => reject (error)
cancel = () => reject (CancelledError())
// determine whether reconnect should be used or not
const shouldUseReconnect = (this.lastDisconnectReason === DisconnectReason.close ||
@@ -94,22 +95,24 @@ export class WAConnection extends Base {
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const recv = this.receiveChatsAndContacts(this.connectOptions.waitOnlyForLastMessage)
waitForChats = recv.waitForChats
cancel = () => {
reject (CancelledError())
rejectSafe = e => {
recv.cancelChats ()
reject (e)
}
}
try {
this.onDebounceTimeout = () => rejectSafe(TimedOutError())
await this.authenticate (reconnectID)
this.rejectPendingConnection = rejectSafe
const [, result] = await Promise.all ([
this.authenticate(reconnectID)
.then(() => {
this.startKeepAliveRequest()
this.startKeepAliveRequest()
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
const result = waitForChats && (await waitForChats)
this.conn
.removeAllListeners ('error')
.removeAllListeners ('close')
}),
waitForChats || Promise.resolve({})
])
this.stopDebouncedTimeout ()
resolve (result)
@@ -117,7 +120,6 @@ export class WAConnection extends Base {
reject (error)
}
})
const rejectSafe = error => reject (error)
this.conn.on('error', rejectSafe)
this.conn.on('close', () => rejectSafe(new Error('close')))
@@ -145,13 +147,14 @@ export class WAConnection extends Base {
const result = connect ()
cancellations.push (result.cancel)
const final = await result.promise
const final = await result.promise
return final
} catch (error) {
this.endConnection ()
throw error
} finally {
cancel()
this.rejectPendingConnection = null
this.off('close', cancel)
}
}
@@ -167,19 +170,10 @@ export class WAConnection extends Base {
let receivedMessages = false
let resolveTask: () => void
const deregisterCallbacks = () => {
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.deregisterCallback(['action', 'add:last'])
this.deregisterCallback(['action', 'add:before'])
this.deregisterCallback(['action', 'add:unread'])
this.deregisterCallback(['response', 'type:chat'])
this.deregisterCallback(['response', 'type:contacts'])
}
let rejectTask: (e: Error) => void
const checkForResolution = () => receivedContacts && receivedMessages && resolveTask ()
// wait for messages to load
const chatUpdate = json => {
const messagesUpdate = json => {
this.startDebouncedTimeout () // restart debounced timeout
receivedMessages = true
@@ -203,14 +197,7 @@ export class WAConnection extends Base {
// if received contacts before messages
if (isLast && receivedContacts) checkForResolution ()
}
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.registerCallback(['action', 'add:last'], chatUpdate)
this.registerCallback(['action', 'add:before'], chatUpdate)
this.registerCallback(['action', 'add:unread'], chatUpdate)
// get chats
this.registerCallback(['response', 'type:chat'], json => {
const chatUpdate = json => {
if (json[1].duplicate || !json[2]) return
this.startDebouncedTimeout () // restart debounced timeout
@@ -235,9 +222,8 @@ export class WAConnection extends Base {
receivedMessages = true
checkForResolution ()
}
})
// get contacts
this.registerCallback(['response', 'type:contacts'], json => {
}
const contactsUpdate = json => {
if (json[1].duplicate || !json[2]) return
this.startDebouncedTimeout () // restart debounced timeout
@@ -251,15 +237,32 @@ export class WAConnection extends Base {
})
this.logger.info (`received ${json[2].length} contacts`)
checkForResolution ()
})
}
const registerCallbacks = () => {
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.on(DEF_CALLBACK_PREFIX + 'action,add:last', messagesUpdate)
this.on(DEF_CALLBACK_PREFIX + 'action,add:before', messagesUpdate)
this.on(DEF_CALLBACK_PREFIX + 'action,add:unread', messagesUpdate)
// get chats
this.on(DEF_CALLBACK_PREFIX + 'response,type:chat', chatUpdate)
// get contacts
this.on(DEF_CALLBACK_PREFIX + 'response,type:contacts', contactsUpdate)
}
const deregisterCallbacks = () => {
this.off(DEF_CALLBACK_PREFIX + 'action,add:last', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'action,add:before', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'action,add:unread', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'response,type:chat', chatUpdate)
this.off(DEF_CALLBACK_PREFIX + 'response,type:contacts', contactsUpdate)
}
// wait for the chats & contacts to load
let cancelChats: () => void
const waitForChats = async () => {
try {
registerCallbacks ()
await new Promise ((resolve, reject) => {
resolveTask = resolve
cancelChats = () => reject (CancelledError())
rejectTask = reject
})
const oldChats = this.chats
@@ -287,11 +290,7 @@ export class WAConnection extends Base {
deregisterCallbacks ()
}
}
return { waitForChats: waitForChats (), cancelChats }
}
private releasePendingRequests () {
this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request
this.pendingRequests = []
return { waitForChats: waitForChats (), cancelChats: () => rejectTask (CancelledError()) }
}
private onMessageRecieved(message: string | Buffer) {
if (message[0] === '!') {
@@ -309,9 +308,7 @@ export class WAConnection extends Base {
if (this.logger.level === 'trace') {
this.logger.trace(messageTag + ', ' + JSON.stringify(json))
}
/*
Check if this is a response to a message we sent
*/
/* Check if this is a response to a message we sent */
if (this.callbacks[messageTag]) {
const q = this.callbacks[messageTag]
q.callback(json)
@@ -321,38 +318,19 @@ export class WAConnection extends Base {
/*
Check if this is a response to a message we are expecting
*/
if (this.callbacks['function:' + json[0]]) {
const callbacks = this.callbacks['function:' + json[0]]
let callbacks2
let callback
for (const key in json[1] || {}) {
callbacks2 = callbacks[key + ':' + json[1][key]]
if (callbacks2) {
break
}
}
if (!callbacks2) {
for (const key in json[1] || {}) {
callbacks2 = callbacks[key]
if (callbacks2) {
break
}
}
}
if (!callbacks2) {
callbacks2 = callbacks['']
}
if (callbacks2) {
callback = callbacks2[json[2] && json[2][0][0]]
if (!callback) {
callback = callbacks2['']
}
}
if (callback) {
callback(json)
return
}
}
let anyTriggered = false
const l0 = json[0] || ''
const l1 = typeof json[1] !== 'object' ? {} : json[1]
const l2 = ((json[2] || [])[0] || [])[0] || ''
Object.keys(l1).forEach(key => {
anyTriggered = anyTriggered || this.emit (`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]},${l2}`, json)
console.log (`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]},${l2}`)
anyTriggered = anyTriggered || this.emit (`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]}`, json)
})
anyTriggered = anyTriggered || this.emit (`${DEF_CALLBACK_PREFIX}${l0},,${l2}`, json)
anyTriggered = anyTriggered || this.emit (`${DEF_CALLBACK_PREFIX}${l0}`, json)
if (anyTriggered) return
if (this.state === 'open' && json[0] === 'Pong') {
if (this.phoneConnected !== json[1]) {
this.phoneConnected = json[1]
@@ -367,7 +345,7 @@ export class WAConnection extends Base {
this.logger.error ({ error }, `encountered error in decrypting message, closing`)
if (this.state === 'open') this.unexpectedDisconnect (DisconnectReason.badSession)
else this.endConnection ()
else this.rejectPendingConnection (new Error(DisconnectReason.badSession))
}
}
}

View File

@@ -10,7 +10,7 @@ export class WAConnection extends Base {
constructor () {
super ()
// new messages
this.registerCallback(['action', 'add:relay', 'message'], json => {
this.on('CB:action,add:relay,message', json => {
const message = json[2][0][2] as WAMessage
const jid = whatsappID( message.key.remoteJid )
if (jid.endsWith('@s.whatsapp.net')) {
@@ -22,7 +22,7 @@ export class WAConnection extends Base {
this.chatAddMessageAppropriate (message)
})
// presence updates
this.registerCallback('Presence', json => {
this.on('CB:Presence', json => {
const update = json[1] as PresenceUpdate
const jid = whatsappID(update.id)
@@ -38,7 +38,7 @@ export class WAConnection extends Base {
this.emit('user-presence-update', update)
})
// If a message has been updated (usually called when a video message gets its upload url, or live locations)
this.registerCallback (['action', 'add:update', 'message'], json => {
this.on ('CB:action,add:update,message', json => {
const message: WAMessage = json[2][0][2]
const jid = whatsappID(message.key.remoteJid)
const chat = this.chats.get(jid)
@@ -55,7 +55,7 @@ export class WAConnection extends Base {
}
})
// If a user's contact has changed
this.registerCallback (['action', null, 'user'], json => {
this.on ('CB:action,,user', json => {
const node = json[2][0]
if (node) {
const user = node[1] as WAContact
@@ -71,7 +71,7 @@ export class WAConnection extends Base {
}
})
// chat archive, pin etc.
this.registerCallback(['action', null, 'chat'], json => {
this.on('CB:action,,chat', json => {
json = json[2][0]
const updateType = json[1].type
@@ -112,7 +112,7 @@ export class WAConnection extends Base {
}
})
// profile picture updates
this.registerCallback(['Cmd', 'type:picture'], async json => {
this.on('CB:Cmd,type:picture', async json => {
const jid = whatsappID(json[1].jid)
const chat = this.chats.get(jid)
if (!chat) return
@@ -121,12 +121,12 @@ export class WAConnection extends Base {
this.emit ('chat-update', { jid, imgUrl: chat.imgUrl })
})
// status updates
this.registerCallback(['Status'], async json => {
this.on('CB:Status', async json => {
const jid = whatsappID(json[1].id)
this.emit ('user-status-update', { jid, status: json[1].status })
})
// read updates
this.registerCallback (['action', null, 'read'], async json => {
this.on ('CB:action,,read', async json => {
const update = json[2][0][1]
const jid = whatsappID(update.jid)
const chat = this.chats.get (jid) || await this.chatAdd (jid)
@@ -136,7 +136,7 @@ export class WAConnection extends Base {
this.emit ('chat-update', { jid: chat.jid, count: chat.count })
})
this.registerCallback (['action', 'add:relay', 'received'], json => {
this.on ('CB:action,add:relay,received', json => {
json = json[2][0][1]
if (json.type === 'error') {
const update: WAMessageStatusUpdate = {
@@ -167,8 +167,8 @@ export class WAConnection extends Base {
}
this.forwardStatusUpdate (update)
}
this.registerCallback('Msg', func)
this.registerCallback('MsgInfo', func)
this.on('CB:Msg', func)
this.on('CB:MsgInfo', func)
this.on ('qr', qr => QR.generate(qr, { small: true }))
}
@@ -371,7 +371,8 @@ export class WAConnection extends Base {
/** when WA sends back a pong */
on (event: 'received-pong', listener: () => void): this
on (event: string, listener: (json: any) => void): this
on (event: BaileysEvent, listener: (...args: any[]) => void) { return super.on (event, listener) }
emit (event: BaileysEvent, ...args: any[]) { return super.emit (event, ...args) }
on (event: BaileysEvent | string, listener: (...args: any[]) => void) { return super.on (event, listener) }
emit (event: BaileysEvent | string, ...args: any[]) { return super.emit (event, ...args) }
}

View File

@@ -136,7 +136,7 @@ export function generateMessageID() {
}
export function decryptWA (message: string | Buffer, macKey: Buffer, encKey: Buffer, decoder: Decoder, fromMe: boolean=false): [string, Object, [number, number]?] {
let commaIndex = message.indexOf(',') // all whatsapp messages have a tag and a comma, followed by the actual message
if (commaIndex < 0) throw Error ('invalid message: ' + message) // if there was no comma, then this message must be not be valid
if (commaIndex < 0) throw new BaileysError ('invalid message', { message }) // if there was no comma, then this message must be not be valid
if (message[commaIndex+1] === ',') commaIndex += 1
let data = message.slice(commaIndex+1, message.length)
@@ -151,7 +151,7 @@ export function decryptWA (message: string | Buffer, macKey: Buffer, encKey: Buf
json = JSON.parse(data) // parse the JSON
} else {
if (!macKey || !encKey) {
throw new Error ('recieved encrypted buffer when auth creds unavailable: ' + message)
throw new BaileysError ('recieved encrypted buffer when auth creds unavailable', { message })
}
/*
If the data recieved was not a JSON, then it must be an encrypted message.

View File

@@ -12,7 +12,7 @@
"resolveJsonModule": true,
"forceConsistentCasingInFileNames": true,
"declaration": true,
"lib": ["es2019", "esnext.array"]
"lib": ["es2020", "esnext.array"]
},
"include": ["src/*/*.ts"],
"exclude": ["node_modules", "src/Tests/*", "src/Binary/GenerateStatics.ts"]