mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
better connections
This commit is contained in:
@@ -1,50 +1,45 @@
|
||||
import WS from 'ws'
|
||||
import * as Utils from './Utils'
|
||||
import { WAMessage, WAChat, WAContact, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError } from './Constants'
|
||||
import { WAMessage, WAChat, WAContact, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions } from './Constants'
|
||||
import {WAConnection as Base} from './1.Validation'
|
||||
import Decoder from '../Binary/Decoder'
|
||||
|
||||
export class WAConnection extends Base {
|
||||
/**
|
||||
* Connect to WhatsAppWeb
|
||||
* @param timeoutMs timeout after which the connect will fail, set to null for an infinite timeout
|
||||
* @param waitForChats should the chats be waited for
|
||||
* @param options the connect options
|
||||
*/
|
||||
async connect(timeoutMs: number = null, waitForChats: boolean = true) {
|
||||
async connect(options: WAConnectOptions = {}) {
|
||||
// if we're already connected, throw an error
|
||||
if (this.state !== 'closed') throw new Error('cannot connect when state=' + this.state)
|
||||
if (this.state !== 'close') throw new Error('cannot connect when state=' + this.state)
|
||||
|
||||
this.state = 'connecting'
|
||||
this.emit ('connecting')
|
||||
|
||||
this.conn = new WS('wss://web.whatsapp.com/ws', null, { origin: 'https://web.whatsapp.com' })
|
||||
|
||||
const promise: Promise<void> = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
|
||||
this.conn.on('open', () => {
|
||||
this.log('connected to WhatsApp Web server, authenticating...', MessageLogLevel.info)
|
||||
// start sending keep alive requests (keeps the WebSocket alive & updates our last seen)
|
||||
this.authenticate()
|
||||
.then(() => {
|
||||
this.startKeepAliveRequest()
|
||||
|
||||
this.conn.removeAllListeners ('error')
|
||||
this.conn.removeAllListeners ('close')
|
||||
this.conn.on ('close', () => this.unexpectedDisconnect ('closed'))
|
||||
|
||||
this.state = 'open'
|
||||
resolve()
|
||||
})
|
||||
.catch(reject)
|
||||
const { ws, cancel } = Utils.openWebSocketConnection (5000, typeof options?.retryOnNetworkErrors === 'undefined' ? true : options?.retryOnNetworkErrors)
|
||||
const promise: Promise<void> = Utils.promiseTimeout(options?.timeoutMs, (resolve, reject) => {
|
||||
ws
|
||||
.then (conn => this.conn = conn)
|
||||
.then (() => this.conn.on('message', data => this.onMessageRecieved(data as any)))
|
||||
.then (() => this.log('connected to WhatsApp Web server, authenticating...', MessageLogLevel.info))
|
||||
.then (() => this.authenticate())
|
||||
.then (() => {
|
||||
this.startKeepAliveRequest()
|
||||
this.conn.removeAllListeners ('error')
|
||||
this.conn.removeAllListeners ('close')
|
||||
this.conn.on ('close', () => this.unexpectedDisconnect ('close'))
|
||||
})
|
||||
.then (resolve)
|
||||
.catch (err => {
|
||||
cancel ()
|
||||
reject (err)
|
||||
})
|
||||
this.conn.on('message', m => this.onMessageRecieved(m))
|
||||
// if there was an error in the WebSocket
|
||||
this.conn.on('error', reject)
|
||||
this.conn.on('close', () => reject(new Error('closed')))
|
||||
})
|
||||
|
||||
try {
|
||||
await promise
|
||||
waitForChats && await this.receiveChatsAndContacts(timeoutMs, true)
|
||||
|
||||
const waitForChats = typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats
|
||||
waitForChats && await this.receiveChatsAndContacts(options?.timeoutMs, true)
|
||||
|
||||
this.phoneConnected = true
|
||||
this.state = 'open'
|
||||
@@ -55,9 +50,12 @@ export class WAConnection extends Base {
|
||||
|
||||
this.releasePendingRequests ()
|
||||
this.log ('opened connection to WhatsApp Web', MessageLogLevel.info)
|
||||
|
||||
return this
|
||||
} catch (error) {
|
||||
this.closeInternal (error.message)
|
||||
const loggedOut = error instanceof BaileysError && error.status >= 400
|
||||
if (loggedOut && this.cancelReconnect) this.cancelReconnect ()
|
||||
this.closeInternal (loggedOut ? 'invalid_session' : error.message)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
@@ -77,101 +75,93 @@ export class WAConnection extends Base {
|
||||
|
||||
let receivedContacts = false
|
||||
let receivedMessages = false
|
||||
let convoResolve: () => void
|
||||
|
||||
const waitForConvos = () =>
|
||||
Utils.promiseTimeout(timeoutMs, resolve => {
|
||||
convoResolve = () => {
|
||||
// de-register the callbacks, so that they don't get called again
|
||||
this.deregisterCallback(['action', 'add:last'])
|
||||
if (!stopAfterMostRecentMessage) {
|
||||
this.deregisterCallback(['action', 'add:before'])
|
||||
this.deregisterCallback(['action', 'add:unread'])
|
||||
}
|
||||
resolve()
|
||||
}
|
||||
const chatUpdate = json => {
|
||||
receivedMessages = true
|
||||
const isLast = json[1].last || stopAfterMostRecentMessage
|
||||
const messages = json[2] as WANode[]
|
||||
let resolveTask: () => void
|
||||
const deregisterCallbacks = () => {
|
||||
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
|
||||
this.deregisterCallback(['action', 'add:last'])
|
||||
if (!stopAfterMostRecentMessage) {
|
||||
this.deregisterCallback(['action', 'add:before'])
|
||||
this.deregisterCallback(['action', 'add:unread'])
|
||||
}
|
||||
this.deregisterCallback(['response', 'type:chat'])
|
||||
this.deregisterCallback(['response', 'type:contacts'])
|
||||
}
|
||||
const checkForResolution = () => {
|
||||
if (receivedContacts && receivedMessages) resolveTask ()
|
||||
}
|
||||
|
||||
// wait for messages to load
|
||||
const chatUpdate = json => {
|
||||
receivedMessages = true
|
||||
const isLast = json[1].last || stopAfterMostRecentMessage
|
||||
const messages = json[2] as WANode[]
|
||||
|
||||
if (messages) {
|
||||
messages.reverse().forEach (([,, message]: ['message', null, WAMessage]) => {
|
||||
const jid = message.key.remoteJid
|
||||
const chat = this.chats.get(jid)
|
||||
chat?.messages.unshift (message)
|
||||
})
|
||||
}
|
||||
// if received contacts before messages
|
||||
if (isLast && receivedContacts) convoResolve ()
|
||||
}
|
||||
|
||||
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
|
||||
this.registerCallback(['action', 'add:last'], chatUpdate)
|
||||
|
||||
if (!stopAfterMostRecentMessage) {
|
||||
this.registerCallback(['action', 'add:before'], chatUpdate)
|
||||
this.registerCallback(['action', 'add:unread'], chatUpdate)
|
||||
}
|
||||
})
|
||||
const waitForChats = async () => (
|
||||
Utils.promiseTimeout (timeoutMs, resolve => {
|
||||
this.registerCallback(['response', 'type:chat'], json => {
|
||||
if (json[1].duplicate || !json[2]) return
|
||||
|
||||
json[2]
|
||||
.forEach(([item, chat]: [any, WAChat]) => {
|
||||
if (!chat) {
|
||||
this.log (`unexpectedly got null chat: ${item}, ${chat}`, MessageLogLevel.info)
|
||||
return
|
||||
}
|
||||
chat.jid = Utils.whatsappID (chat.jid)
|
||||
chat.t = +chat.t
|
||||
chat.count = +chat.count
|
||||
chat.messages = []
|
||||
|
||||
const oldChat = this.chats.get(chat.jid)
|
||||
oldChat && this.chats.delete (oldChat)
|
||||
|
||||
this.chats.insert (chat) // chats data (log json to see what it looks like)
|
||||
})
|
||||
|
||||
this.deregisterCallback(['response', 'type:chat'])
|
||||
|
||||
this.log ('received chats list', MessageLogLevel.info)
|
||||
|
||||
if (this.chats.all().length > 0) waitForConvos().then (resolve)
|
||||
else resolve ()
|
||||
})
|
||||
})
|
||||
)
|
||||
const waitForContacts = async () => (
|
||||
new Promise (resolve => {
|
||||
this.registerCallback(['response', 'type:contacts'], json => {
|
||||
if (json[1].duplicate) return
|
||||
|
||||
receivedContacts = true
|
||||
|
||||
json[2].forEach(([type, contact]: ['user', WAContact]) => {
|
||||
if (!contact) return this.log (`unexpectedly got null contact: ${type}, ${contact}`, MessageLogLevel.info)
|
||||
|
||||
contact.jid = Utils.whatsappID (contact.jid)
|
||||
this.contacts[contact.jid] = contact
|
||||
})
|
||||
// if you receive contacts after messages
|
||||
// should probably resolve the promise
|
||||
if (receivedMessages) convoResolve()
|
||||
resolve ()
|
||||
|
||||
this.deregisterCallback(['response', 'type:contacts'])
|
||||
|
||||
this.log ('received contacts list', MessageLogLevel.info)
|
||||
if (messages) {
|
||||
messages.reverse().forEach (([,, message]: ['message', null, WAMessage]) => {
|
||||
const jid = message.key.remoteJid
|
||||
const chat = this.chats.get(jid)
|
||||
chat?.messages.unshift (message)
|
||||
})
|
||||
})
|
||||
)
|
||||
// wait for the chats & contacts to load
|
||||
await Promise.all( [waitForChats(), waitForContacts()] )
|
||||
}
|
||||
// if received contacts before messages
|
||||
if (isLast && receivedContacts) checkForResolution ()
|
||||
}
|
||||
|
||||
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
|
||||
this.registerCallback(['action', 'add:last'], chatUpdate)
|
||||
if (!stopAfterMostRecentMessage) {
|
||||
this.registerCallback(['action', 'add:before'], chatUpdate)
|
||||
this.registerCallback(['action', 'add:unread'], chatUpdate)
|
||||
}
|
||||
|
||||
this.registerCallback(['response', 'type:chat'], json => {
|
||||
if (json[1].duplicate || !json[2]) return
|
||||
|
||||
json[2]
|
||||
.forEach(([item, chat]: [any, WAChat]) => {
|
||||
if (!chat) {
|
||||
this.log (`unexpectedly got null chat: ${item}, ${chat}`, MessageLogLevel.info)
|
||||
return
|
||||
}
|
||||
chat.jid = Utils.whatsappID (chat.jid)
|
||||
chat.t = +chat.t
|
||||
chat.count = +chat.count
|
||||
chat.messages = []
|
||||
|
||||
const oldChat = this.chats.get(chat.jid)
|
||||
oldChat && this.chats.delete (oldChat)
|
||||
|
||||
this.chats.insert (chat) // chats data (log json to see what it looks like)
|
||||
})
|
||||
|
||||
this.log ('received chats list', MessageLogLevel.info)
|
||||
})
|
||||
// get contacts
|
||||
this.registerCallback(['response', 'type:contacts'], json => {
|
||||
if (json[1].duplicate) return
|
||||
|
||||
receivedContacts = true
|
||||
|
||||
json[2].forEach(([type, contact]: ['user', WAContact]) => {
|
||||
if (!contact) return this.log (`unexpectedly got null contact: ${type}, ${contact}`, MessageLogLevel.info)
|
||||
|
||||
contact.jid = Utils.whatsappID (contact.jid)
|
||||
this.contacts[contact.jid] = contact
|
||||
})
|
||||
this.log ('received contacts list', MessageLogLevel.info)
|
||||
checkForResolution ()
|
||||
})
|
||||
// wait for the chats & contacts to load
|
||||
await Utils.promiseTimeout (timeoutMs, (resolve, reject) => {
|
||||
resolveTask = resolve
|
||||
const rejectTask = (reason) => {
|
||||
reject (new Error(reason))
|
||||
this.off ('close', rejectTask)
|
||||
}
|
||||
this.on ('close', rejectTask)
|
||||
}).finally (deregisterCallbacks)
|
||||
|
||||
this.chats.all ().forEach (chat => {
|
||||
const respectiveContact = this.contacts[chat.jid]
|
||||
chat.title = respectiveContact?.name || respectiveContact?.notify
|
||||
@@ -181,16 +171,15 @@ export class WAConnection extends Base {
|
||||
this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request
|
||||
this.pendingRequests = []
|
||||
}
|
||||
private onMessageRecieved(message) {
|
||||
private onMessageRecieved(message: string | Buffer) {
|
||||
if (message[0] === '!') {
|
||||
// when the first character in the message is an '!', the server is updating the last seen
|
||||
const timestamp = message.slice(1, message.length)
|
||||
const timestamp = message.slice(1, message.length).toString ('utf-8')
|
||||
this.lastSeen = new Date(parseInt(timestamp))
|
||||
} else {
|
||||
const decrypted = Utils.decryptWA (message, this.authInfo.macKey, this.authInfo.encKey, new Decoder())
|
||||
if (!decrypted) {
|
||||
return
|
||||
}
|
||||
if (!decrypted) return
|
||||
|
||||
const [messageTag, json] = decrypted
|
||||
|
||||
if (this.logLevel === MessageLogLevel.all) {
|
||||
@@ -261,12 +250,16 @@ export class WAConnection extends Base {
|
||||
this.cancelledReconnect = false
|
||||
try {
|
||||
while (true) {
|
||||
const {delay, cancel} = Utils.delayCancellable (5000)
|
||||
this.cancelReconnect = cancel
|
||||
const {delay, cancel} = Utils.delayCancellable (2500)
|
||||
this.cancelReconnect = () => {
|
||||
this.cancelledReconnect = true
|
||||
this.cancelReconnect = null
|
||||
cancel ()
|
||||
}
|
||||
|
||||
await delay
|
||||
try {
|
||||
await this.connect ()
|
||||
await this.connect ({ timeoutMs: 30000, retryOnNetworkErrors: true })
|
||||
this.cancelReconnect = null
|
||||
break
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user