Major redo with respect to chats/contacts -- read desc

Waiting for chats & contacts is hella unreliable, so I've put them as events
1. receive chats via the `chats-received` event. If new chats are found, the flag for that is sent as well
2. receive contacts via the `contacts-received` event
3. When WA sends older messages, the `chats-update` or `chat-update` event is triggered
4. Baileys keeps track of all the changed conversations between connects

Connects almost always take less than 10 seconds!
This commit is contained in:
Adhiraj Singh
2020-11-13 23:15:16 +05:30
parent eace0c1795
commit 6d02d405a7
10 changed files with 232 additions and 196 deletions

View File

@@ -7,7 +7,7 @@ export const testJid = process.env.TEST_JID || '1234@s.whatsapp.net' // set TEST
export const makeConnection = () => {
const conn = new WAConnection()
conn.connectOptions.maxIdleTimeMs = 45_000
conn.connectOptions.maxIdleTimeMs = 15_000
conn.logger.level = 'debug'
let evCounts = {}

View File

@@ -1,6 +1,6 @@
import * as assert from 'assert'
import {WAConnection} from '../WAConnection/WAConnection'
import { AuthenticationCredentialsBase64, BaileysError, ReconnectMode, DisconnectReason } from '../WAConnection/Constants'
import { AuthenticationCredentialsBase64, BaileysError, ReconnectMode, DisconnectReason, WAChat } from '../WAConnection/Constants'
import { delay } from '../WAConnection/Utils'
import { assertChatDBIntegrity, makeConnection, testJid } from './Common'
@@ -291,14 +291,11 @@ describe ('Pending Requests', () => {
it ('should correctly send updates', async () => {
const conn = makeConnection ()
conn.pendingRequestTimeoutMs = null
conn.loadAuthInfo('./auth_info.json')
const task = new Promise(resolve => conn.once('chats-received', resolve))
await conn.connect ()
conn.close ()
const result0 = await conn.connect ()
assert.deepEqual (result0.updatedChats, {})
await task
conn.close ()
@@ -306,19 +303,18 @@ describe ('Pending Requests', () => {
oldChat.archive = 'true' // mark the first chat as archived
oldChat.modify_tag = '1234' // change modify tag to detect change
// close the socket after a few seconds second to see if updates are correct after a reconnect
setTimeout (() => conn['conn'].close(), 5000)
const promise = new Promise(resolve => conn.once('chats-update', resolve))
const result = await conn.connect ()
assert.ok (!result.newConnection)
const chat = result.updatedChats[oldChat.jid]
const chats = await promise as Partial<WAChat>[]
const chat = chats.find (c => c.jid === oldChat.jid)
assert.ok (chat)
assert.ok ('archive' in chat)
assert.equal (Object.keys(chat).length, 2)
assert.equal (Object.keys(result.updatedChats).length, 1)
assert.strictEqual (Object.keys(chat).length, 3)
assert.strictEqual (Object.keys(chats).length, 1)
conn.close ()
})

View File

@@ -32,7 +32,7 @@ const logger = pino({ prettyPrint: { levelFirst: true, ignore: 'hostname', trans
export class WAConnection extends EventEmitter {
/** The version of WhatsApp Web we're telling the servers we are */
version: [number, number, number] = [2, 2045, 15]
version: [number, number, number] = [2, 2045, 19]
/** The Browser we're telling the WhatsApp Web servers we are */
browserDescription: [string, string, string] = Utils.Browsers.baileys ('Chrome')
/** Metadata like WhatsApp id, name set on WhatsApp etc. */
@@ -43,9 +43,9 @@ export class WAConnection extends EventEmitter {
state: WAConnectionState = 'close'
connectOptions: WAConnectOptions = {
regenerateQRIntervalMs: 30_000,
maxIdleTimeMs: 40_000,
maxIdleTimeMs: 15_000,
waitOnlyForLastMessage: false,
waitForChats: true,
waitForChats: false,
maxRetries: 10,
connectCooldownMs: 4000,
phoneResponseTime: 15_000,
@@ -67,6 +67,7 @@ export class WAConnection extends EventEmitter {
maxCachedMessages = 50
loadProfilePicturesForChatsAutomatically = true
lastChatsReceived: Date
chats = new KeyedDB (Utils.waChatKey(false), value => value.jid)
contacts: { [k: string]: WAContact } = {}

View File

@@ -76,7 +76,13 @@ export class WAConnection extends Base {
this.emit ('connection-validated', this.user)
if (this.loadProfilePicturesForChatsAutomatically) {
const response = await this.query({ json: ['query', 'ProfilePicThumb', this.user.jid], waitForOpen: false, expect200: false, requiresPhoneConnection: false, startDebouncedTimeout: true })
const response = await this.query({
json: ['query', 'ProfilePicThumb', this.user.jid],
waitForOpen: false,
expect200: false,
requiresPhoneConnection: false,
startDebouncedTimeout: true
})
this.user.imgUrl = response?.eurl || ''
}

View File

@@ -1,9 +1,8 @@
import * as Utils from './Utils'
import { WAMessage, WAChat, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError, WAOpenResult, DEFAULT_ORIGIN, WS_URL } from './Constants'
import { KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, CancelledError, WAOpenResult, DEFAULT_ORIGIN, WS_URL } from './Constants'
import {WAConnection as Base} from './1.Validation'
import Decoder from '../Binary/Decoder'
import WS from 'ws'
import KeyedDB from '@adiwajshing/keyed-db'
const DEF_CALLBACK_PREFIX = 'CB:'
const DEF_TAG_PREFIX = 'TAG:'
@@ -22,7 +21,7 @@ export class WAConnection extends Base {
let tries = 0
let lastConnect = this.lastDisconnectTime
var updates
let updates: any
while (this.state === 'connecting') {
tries += 1
try {
@@ -48,9 +47,7 @@ export class WAConnection extends Base {
if (!willReconnect) throw error
}
}
const updatedChats = !!this.lastDisconnectTime && updates
const result: WAOpenResult = { user: this.user, newConnection, updatedChats }
const result: WAOpenResult = { user: this.user, newConnection, ...(updates || {}) }
this.emit ('open', result)
this.logger.info ('opened connection to WhatsApp Web')
@@ -73,7 +70,7 @@ export class WAConnection extends Base {
!this.connectOptions.alwaysUseTakeover
const reconnectID = shouldUseReconnect && this.user.jid.replace ('@s.whatsapp.net', '@c.us')
this.conn = new WS(WS_URL, null, {
this.conn = new WS(WS_URL, null, {
origin: DEFAULT_ORIGIN,
timeout: this.connectOptions.maxIdleTimeMs,
agent: options.agent,
@@ -90,12 +87,12 @@ export class WAConnection extends Base {
this.conn.on ('open', async () => {
this.logger.info(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`)
let waitForChats: Promise<{[k: string]: Partial<WAChat>}>
let waitForChats: Promise<any>
// add wait for chats promise if required
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const {wait, cancelChats} = this.receiveChatsAndContacts(this.connectOptions.waitOnlyForLastMessage)
const {wait, cancellations} = this.receiveChatsAndContacts(this.connectOptions.waitOnlyForLastMessage)
waitForChats = wait
rejections.push (cancelChats)
rejections.push (...cancellations)
}
try {
const [, result] = await Promise.all (
@@ -116,7 +113,7 @@ export class WAConnection extends Base {
})
this.conn.on('error', rejectAll)
this.conn.on('close', () => rejectAll(new Error(DisconnectReason.close)))
}) as Promise<void | { [k: string]: Partial<WAChat> }>
}) as Promise<void | any>
)
this.on ('ws-close', rejectAll)
@@ -140,135 +137,30 @@ export class WAConnection extends Base {
* Must be called immediately after connect
*/
protected receiveChatsAndContacts(waitOnlyForLast: boolean) {
const chats = new KeyedDB(this.chatOrderingKey, c => c.jid)
const contacts = {}
let receivedChats = false
let receivedContacts = false
let receivedMessages = false
let resolveTask: () => void
let rejectTask: (e: Error) => void
const checkForResolution = () => receivedContacts && receivedChats && receivedMessages && resolveTask ()
// wait for messages to load
const messagesUpdate = json => {
this.startDebouncedTimeout () // restart debounced timeout
const isLast = json[1].last || waitOnlyForLast
const messages = json[2] as WANode[]
if (messages) {
messages.reverse().forEach (([,, message]: ['message', null, WAMessage]) => {
const jid = message.key.remoteJid
const chat = chats.get(jid)
if (chat) {
const fm = chat.messages.all()[0]
const prevEpoch = (fm && fm['epoch']) || 0
message['epoch'] = prevEpoch-1
chat.messages.insert (message)
}
const rejectableWaitForEvent = (event: string) => {
let rejectTask = (_: Error) => {}
const task = new Promise((resolve, reject) => {
this.once (event, data => {
this.startDebouncedTimeout() // start timeout again
resolve(data)
})
}
if (isLast) receivedMessages = true
// if received contacts before messages
if (isLast && receivedContacts) checkForResolution ()
}
const chatUpdate = json => {
if (json[1].duplicate || !json[2]) return
this.startDebouncedTimeout () // restart debounced timeout
json[2]
.forEach(([item, chat]: [any, WAChat]) => {
if (!chat) {
this.logger.warn (`unexpectedly got null chat: ${item}`, chat)
return
}
chat.jid = Utils.whatsappID (chat.jid)
chat.t = +chat.t
chat.count = +chat.count
chat.messages = Utils.newMessagesDB()
// chats data (log json to see what it looks like)
!chats.get (chat.jid) && chats.insert (chat)
rejectTask = reject
})
this.logger.info (`received ${json[2].length} chats`)
receivedChats = true
if (json[2].length === 0) receivedMessages = true
checkForResolution ()
return { reject: rejectTask, task }
}
const contactsUpdate = json => {
if (json[1].duplicate || !json[2]) return
this.startDebouncedTimeout () // restart debounced timeout
const events = [ 'chats-received', 'contacts-received', 'CB:action,add:last' ]
if (!waitOnlyForLast) events.push('CB:action,add:before', 'CB:action,add:unread')
receivedContacts = true
json[2].forEach(([type, contact]: ['user', WAContact]) => {
if (!contact) return this.logger.info (`unexpectedly got null contact: ${type}`, contact)
contact.jid = Utils.whatsappID (contact.jid)
contacts[contact.jid] = contact
const cancellations = []
const wait = Promise.all (
events.map (ev => {
const {reject, task} = rejectableWaitForEvent(ev)
cancellations.push(reject)
return task
})
this.logger.info (`received ${json[2].length} contacts`)
checkForResolution ()
}
const registerCallbacks = () => {
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.on(DEF_CALLBACK_PREFIX + 'action,add:last', messagesUpdate)
this.on(DEF_CALLBACK_PREFIX + 'action,add:before', messagesUpdate)
this.on(DEF_CALLBACK_PREFIX + 'action,add:unread', messagesUpdate)
// get chats
this.on(DEF_CALLBACK_PREFIX + 'response,type:chat', chatUpdate)
// get contacts
this.on(DEF_CALLBACK_PREFIX + 'response,type:contacts', contactsUpdate)
}
const deregisterCallbacks = () => {
this.off(DEF_CALLBACK_PREFIX + 'action,add:last', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'action,add:before', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'action,add:unread', messagesUpdate)
this.off(DEF_CALLBACK_PREFIX + 'response,type:chat', chatUpdate)
this.off(DEF_CALLBACK_PREFIX + 'response,type:contacts', contactsUpdate)
}
// wait for the chats & contacts to load
const wait = (async () => {
try {
registerCallbacks ()
await new Promise ((resolve, reject) => {
resolveTask = resolve
rejectTask = reject
})
const oldChats = this.chats
const updatedChats: { [k: string]: Partial<WAChat> } = {}
chats.all().forEach (chat => {
const respectiveContact = contacts[chat.jid]
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
const oldChat = oldChats.get(chat.jid)
if (!oldChat) {
updatedChats[chat.jid] = chat
} else if (oldChat.t < chat.t || oldChat.modify_tag !== chat.modify_tag) {
const changes = Utils.shallowChanges (oldChat, chat)
delete changes.messages
updatedChats[chat.jid] = changes
}
})
this.chats = chats
this.contacts = contacts
return updatedChats
} finally {
deregisterCallbacks ()
}
})()
return { wait, cancelChats: () => rejectTask (CancelledError()) }
).then(([update]) => update as { hasNewChats: boolean })
return { wait, cancellations }
}
private onMessageRecieved(message: string | Buffer) {
if (message[0] === '!') {

View File

@@ -1,7 +1,7 @@
import * as QR from 'qrcode-terminal'
import { WAConnection as Base } from './3.Connect'
import { WAMessageStatusUpdate, WAMessage, WAContact, WAChat, WAMessageProto, WA_MESSAGE_STUB_TYPE, WA_MESSAGE_STATUS_TYPE, PresenceUpdate, BaileysEvent, DisconnectReason, WAOpenResult, Presence, AuthenticationCredentials, WAParticipantAction, WAGroupMetadata, WAUser } from './Constants'
import { whatsappID, unixTimestampSeconds, isGroupID, GET_MESSAGE_ID, WA_MESSAGE_ID, waMessageKey, newMessagesDB } from './Utils'
import { WAMessageStatusUpdate, WAMessage, WAContact, WAChat, WAMessageProto, WA_MESSAGE_STUB_TYPE, WA_MESSAGE_STATUS_TYPE, PresenceUpdate, BaileysEvent, DisconnectReason, WAOpenResult, Presence, AuthenticationCredentials, WAParticipantAction, WAGroupMetadata, WAUser, WANode } from './Constants'
import { whatsappID, unixTimestampSeconds, isGroupID, GET_MESSAGE_ID, WA_MESSAGE_ID, waMessageKey, newMessagesDB, shallowChanges } from './Utils'
import KeyedDB from '@adiwajshing/keyed-db'
import { Mutex } from './Mutex'
@@ -9,6 +9,123 @@ export class WAConnection extends Base {
constructor () {
super ()
// chats received
this.on('CB:response,type:chat', json => {
if (json[1].duplicate || !json[2]) return
const chats = new KeyedDB(this.chatOrderingKey, c => c.jid)
json[2].forEach(([item, chat]: [any, WAChat]) => {
if (!chat) {
this.logger.warn (`unexpectedly got null chat: ${item}`, chat)
return
}
chat.jid = whatsappID (chat.jid)
chat.t = +chat.t
chat.count = +chat.count
chat.messages = newMessagesDB()
// chats data (log json to see what it looks like)
!chats.get (chat.jid) && chats.insert (chat)
})
this.logger.info (`received ${json[2].length} chats`)
const oldChats = this.chats
const updatedChats = []
let hasNewChats = false
chats.all().forEach (chat => {
const respectiveContact = this.contacts[chat.jid]
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
const oldChat = oldChats.get(chat.jid)
if (!oldChat) {
hasNewChats = true
} else {
chat.messages = oldChat.messages
if (oldChat.t !== chat.t || oldChat.modify_tag !== chat.modify_tag) {
const changes = shallowChanges (oldChat, chat)
delete changes.messages
updatedChats.push({ jid: chat.jid, ...changes })
}
}
})
this.chats = chats
this.lastChatsReceived = new Date()
updatedChats.length > 0 && this.emit('chats-update', updatedChats)
this.emit('chats-received', { hasNewChats })
})
// we store these last messages
const lastMessages = {}
// messages received
const messagesUpdate = (json, style: 'prepend' | 'append') => {
const messages = json[2] as WANode[]
if (messages) {
const updates: { [k: string]: KeyedDB<WAMessage, string> } = {}
messages.reverse().forEach (([,, message]: ['message', null, WAMessage]) => {
const jid = message.key.remoteJid
const chat = this.chats.get(jid)
const mKeyID = WA_MESSAGE_ID(message)
if (chat && !chat.messages.get(mKeyID)) {
if (style === 'prepend') {
const fm = chat.messages.get(lastMessages[jid])
if (!fm) return
const prevEpoch = fm['epoch']
message['epoch'] = prevEpoch-1
} else if (style === 'append') {
const lm = chat.messages.all()[chat.messages.length-1]
const prevEpoch = (lm && lm['epoch']) || 0
message['epoch'] = prevEpoch+100 // hacky way to allow more previous messages
}
chat.messages.insert (message)
updates[jid] = updates[jid] || newMessagesDB()
updates[jid].insert(message)
lastMessages[jid] = mKeyID
} else if (!chat) this.logger.debug({ jid }, `chat not found`)
})
if (Object.keys(updates).length > 0) {
this.emit ('chats-update',
Object.keys(updates).map(jid => ({ jid, messages: updates[jid] }))
)
}
}
}
this.on('CB:action,add:last', json => messagesUpdate(json, 'append'))
this.on('CB:action,add:before', json => messagesUpdate(json, 'prepend'))
this.on('CB:action,add:unread', json => messagesUpdate(json, 'prepend'))
// contacts received
this.on('CB:response,type:contacts', json => {
if (json[1].duplicate || !json[2]) return
const contacts: { [_: string]: WAContact } = {}
json[2].forEach(([type, contact]: ['user', WAContact]) => {
if (!contact) return this.logger.info (`unexpectedly got null contact: ${type}`, contact)
contact.jid = whatsappID (contact.jid)
contacts[contact.jid] = contact
})
// update chat names
const updatedChats = []
this.chats.all().forEach(c => {
const contact = contacts[c.jid]
if (contact) {
const name = contact?.name || contact?.notify || c.name
if (name !== c.name) {
updatedChats.push({ jid: c.jid, name })
}
}
})
updatedChats.length > 0 && this.emit('chats-update', updatedChats)
this.logger.info (`received ${json[2].length} contacts`)
this.contacts = contacts
this.emit('contacts-received')
})
// new messages
this.on('CB:action,add:relay,message', json => {
const message = json[2][0][2] as WAMessage
@@ -386,6 +503,12 @@ export class WAConnection extends Base {
on (event: 'user-status-update', listener: (update: {jid: string, status?: string}) => void): this
/** when a new chat is added */
on (event: 'chat-new', listener: (chat: WAChat) => void): this
/** when contacts are sent by WA */
on (event: 'contacts-received', listener: () => void): this
/** when chats are sent by WA */
on (event: 'chats-received', listener: (update: {hasNewChats: boolean}) => void): this
/** when multiple chats are updated (new message, updated message, deleted, pinned, etc) */
on (event: 'chats-update', listener: (chats: (Partial<WAChat> & { jid: string })[]) => void): this
/** when a chat is updated (new message, updated message, deleted, pinned, etc) */
on (event: 'chat-update', listener: (chat: Partial<WAChat> & { jid: string }) => void): this
/**
@@ -407,7 +530,7 @@ export class WAConnection extends Base {
/** when WA sends back a pong */
on (event: 'received-pong', listener: () => void): this
on (event: string, listener: (json: any) => void): this
on (event: BaileysEvent | string, listener: (json: any) => void): this
on (event: BaileysEvent | string, listener: (...args: any[]) => void) { return super.on (event, listener) }
emit (event: BaileysEvent | string, ...args: any[]) { return super.emit (event, ...args) }

View File

@@ -79,9 +79,15 @@ export type WAConnectOptions = {
maxIdleTimeMs?: number
/** maximum attempts to connect */
maxRetries?: number
/** should the chats be waited for */
/**
* @deprecated -- use the `chats-received` & `contacts-received` events
* should the chats be waited for
* */
waitForChats?: boolean
/** if set to true, the connect only waits for the last message of the chat */
/**
* @deprecated -- use the `chats-received` & `contacts-received` events
* if set to true, the connect only waits for the last message of the chat
* */
waitOnlyForLastMessage?: boolean
/** max time for the phone to respond to a connectivity test */
phoneResponseTime?: number
@@ -376,9 +382,7 @@ export interface WAOpenResult {
/** Was this connection opened via a QR scan */
newConnection: boolean
user: WAUser
updatedChats?: {
[k: string]: Partial<WAChat>
}
hasNewChats?: boolean
}
export enum GroupSettingChange {
@@ -423,6 +427,8 @@ export type BaileysEvent =
'connection-phone-change' |
'user-presence-update' |
'user-status-update' |
'contacts-received' |
'chats-received' |
'chat-new' |
'chat-update' |
'message-status-update' |

View File

@@ -41,7 +41,7 @@ export const isGroupID = (jid: string) => jid?.endsWith ('@g.us')
export const newMessagesDB = (messages: WAMessage[] = []) => {
const db = new KeyedDB(waMessageKey, WA_MESSAGE_ID)
messages.forEach(m => db.insert(m))
messages.forEach(m => !db.get(WA_MESSAGE_ID(m)) && db.insert(m))
return db
}