mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
More reliable connect with automatic retries + default connect options
This commit is contained in:
@@ -1,83 +1,143 @@
|
||||
import * as Utils from './Utils'
|
||||
import { WAMessage, WAChat, WAContact, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason } from './Constants'
|
||||
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError } from './Constants'
|
||||
import {WAConnection as Base} from './1.Validation'
|
||||
import Decoder from '../Binary/Decoder'
|
||||
|
||||
export class WAConnection extends Base {
|
||||
/**
|
||||
* Connect to WhatsAppWeb
|
||||
* @param options the connect options
|
||||
*/
|
||||
async connect(options: WAConnectOptions = {}) {
|
||||
/** Connect to WhatsApp Web */
|
||||
async connect() {
|
||||
// if we're already connected, throw an error
|
||||
if (this.state !== 'close') throw new Error('cannot connect when state=' + this.state)
|
||||
|
||||
const options = this.connectOptions
|
||||
|
||||
this.state = 'connecting'
|
||||
this.emit ('connecting')
|
||||
|
||||
const { ws, cancel } = Utils.openWebSocketConnection (5000, typeof options?.retryOnNetworkErrors === 'undefined' ? true : options?.retryOnNetworkErrors)
|
||||
const promise = Utils.promiseTimeout(options?.timeoutMs, (resolve, reject) => {
|
||||
ws
|
||||
.then (conn => this.conn = conn)
|
||||
.then (() => this.conn.on('message', data => this.onMessageRecieved(data as any)))
|
||||
.then (() => this.log(`connected to WhatsApp Web server, authenticating via ${options.reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info))
|
||||
.then (() => this.authenticate(options?.reconnectID))
|
||||
.then (() => {
|
||||
this.conn.removeAllListeners ('error')
|
||||
this.conn.removeAllListeners ('close')
|
||||
this.conn.on ('close', () => this.unexpectedDisconnect (DisconnectReason.close))
|
||||
})
|
||||
.then (resolve)
|
||||
.catch (reject)
|
||||
})
|
||||
.catch (err => {
|
||||
cancel ()
|
||||
throw err
|
||||
}) as Promise<void>
|
||||
let tries = 0
|
||||
while (this.state === 'connecting') {
|
||||
tries += 1
|
||||
try {
|
||||
// if the first try failed, delay & connect again
|
||||
await this.connectInternal (options, tries > 1 && 2000)
|
||||
|
||||
this.phoneConnected = true
|
||||
this.state = 'open'
|
||||
} catch (error) {
|
||||
const loggedOut = error instanceof BaileysError && UNAUTHORIZED_CODES.includes(error.status)
|
||||
const willReconnect = !loggedOut && (tries <= (options?.maxRetries || 5)) && this.state === 'connecting'
|
||||
|
||||
this.log (`connect attempt ${tries} failed: ${error}${ willReconnect ? ', retrying...' : ''}`, MessageLogLevel.info)
|
||||
|
||||
if ((this.state as string) !== 'close' && !willReconnect) {
|
||||
this.closeInternal (loggedOut ? DisconnectReason.invalidSession : error.message)
|
||||
}
|
||||
|
||||
if (!willReconnect) throw error
|
||||
}
|
||||
}
|
||||
|
||||
this.emit ('open')
|
||||
|
||||
this.releasePendingRequests ()
|
||||
this.startKeepAliveRequest()
|
||||
|
||||
this.log ('opened connection to WhatsApp Web', MessageLogLevel.info)
|
||||
|
||||
this.conn.on ('close', () => this.unexpectedDisconnect (DisconnectReason.close))
|
||||
|
||||
return this
|
||||
|
||||
}
|
||||
/** Meat of the connect logic */
|
||||
protected async connectInternal (options: WAConnectOptions, delayMs?: number) {
|
||||
// actual connect
|
||||
const connect = () => {
|
||||
const tasks: Promise<void>[] = []
|
||||
const timeoutMs = options?.timeoutMs || 60*1000
|
||||
|
||||
const { ws, cancel } = Utils.openWebSocketConnection (5000, false)
|
||||
|
||||
let cancelTask: () => void
|
||||
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
|
||||
|
||||
const {waitForChats, cancelChats} = this.receiveChatsAndContacts(timeoutMs, true)
|
||||
tasks.push (waitForChats)
|
||||
|
||||
cancellations.push (cancelChats)
|
||||
cancelTask = () => { cancelChats(); cancel() }
|
||||
} else cancelTask = cancel
|
||||
|
||||
// determine whether reconnect should be used or not
|
||||
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced &&
|
||||
this.lastDisconnectReason !== DisconnectReason.unknown &&
|
||||
this.lastDisconnectReason !== DisconnectReason.intentional && this.user
|
||||
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
|
||||
|
||||
const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
|
||||
ws
|
||||
.then (conn => this.conn = conn)
|
||||
.then (() => this.conn.on('message', data => this.onMessageRecieved(data as any)))
|
||||
.then (() => (
|
||||
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
|
||||
))
|
||||
.then (() => this.authenticate(reconnectID))
|
||||
.then (() => (
|
||||
this.conn
|
||||
.removeAllListeners ('error')
|
||||
.removeAllListeners('close')
|
||||
))
|
||||
.then (resolve)
|
||||
.catch (reject)
|
||||
})
|
||||
.catch (err => {
|
||||
this.removePendingCallbacks ()
|
||||
throw err
|
||||
}) as Promise<void>
|
||||
|
||||
tasks.push (promise)
|
||||
|
||||
return {
|
||||
promise: Promise.all (tasks),
|
||||
cancel: cancelTask
|
||||
}
|
||||
}
|
||||
|
||||
let promise = Promise.resolve ()
|
||||
let cancellations: (() => void)[] = []
|
||||
|
||||
const cancel = () => cancellations.forEach (cancel => cancel())
|
||||
|
||||
this.on ('close', cancel)
|
||||
|
||||
try {
|
||||
const tasks = [promise]
|
||||
|
||||
const waitForChats = typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats
|
||||
if (waitForChats) tasks.push (this.receiveChatsAndContacts(options?.timeoutMs, true))
|
||||
|
||||
await Promise.all (tasks)
|
||||
|
||||
this.phoneConnected = true
|
||||
this.state = 'open'
|
||||
|
||||
this.emit ('open')
|
||||
|
||||
this.startKeepAliveRequest()
|
||||
this.registerPhoneConnectionPoll ()
|
||||
this.releasePendingRequests ()
|
||||
|
||||
this.log ('opened connection to WhatsApp Web', MessageLogLevel.info)
|
||||
|
||||
return this
|
||||
} catch (error) {
|
||||
const loggedOut = error instanceof BaileysError && error.status === 401
|
||||
if (loggedOut && this.cancelReconnect) this.cancelReconnect ()
|
||||
|
||||
if ((this.state as string) !== 'close') {
|
||||
this.closeInternal (loggedOut ? 'invalid_session' : error.message)
|
||||
}
|
||||
|
||||
throw error
|
||||
} finally {
|
||||
this.off ('close', cancel)
|
||||
if (delayMs) {
|
||||
const {delay, cancel} = Utils.delayCancellable (delayMs)
|
||||
promise = delay
|
||||
cancellations.push (cancel)
|
||||
}
|
||||
|
||||
return promise
|
||||
.then (() => {
|
||||
const {promise, cancel} = connect ()
|
||||
cancellations.push (cancel)
|
||||
|
||||
return promise
|
||||
})
|
||||
.finally (() => {
|
||||
cancel()
|
||||
this.off('close', cancel)
|
||||
})
|
||||
}
|
||||
/**
|
||||
* Sets up callbacks to receive chats, contacts & messages.
|
||||
* Must be called immediately after connect
|
||||
* @returns [chats, contacts]
|
||||
*/
|
||||
protected async receiveChatsAndContacts(timeoutMs: number = null, stopAfterMostRecentMessage: boolean=false) {
|
||||
protected receiveChatsAndContacts(timeoutMs: number = null, stopAfterMostRecentMessage: boolean=false) {
|
||||
this.contacts = {}
|
||||
this.chats.clear ()
|
||||
|
||||
let receivedContacts = false
|
||||
let receivedMessages = false
|
||||
|
||||
let resolveTask: () => void
|
||||
@@ -89,7 +149,12 @@ export class WAConnection extends Base {
|
||||
this.deregisterCallback(['action', 'add:unread'])
|
||||
}
|
||||
this.deregisterCallback(['response', 'type:chat'])
|
||||
}
|
||||
this.deregisterCallback(['response', 'type:contacts'])
|
||||
}
|
||||
const checkForResolution = () => {
|
||||
if (receivedContacts && receivedMessages) resolveTask ()
|
||||
}
|
||||
|
||||
// wait for messages to load
|
||||
const chatUpdate = json => {
|
||||
receivedMessages = true
|
||||
@@ -104,7 +169,7 @@ export class WAConnection extends Base {
|
||||
})
|
||||
}
|
||||
// if received contacts before messages
|
||||
if (isLast) resolveTask ()
|
||||
if (isLast && receivedContacts) checkForResolution ()
|
||||
}
|
||||
|
||||
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
|
||||
@@ -115,9 +180,10 @@ export class WAConnection extends Base {
|
||||
}
|
||||
// get chats
|
||||
this.registerCallback(['response', 'type:chat'], json => {
|
||||
if (json[1].duplicate) return
|
||||
if (json[1].duplicate || !json[2]) return
|
||||
|
||||
json[2]?.forEach(([item, chat]: [any, WAChat]) => {
|
||||
json[2]
|
||||
.forEach(([item, chat]: [any, WAChat]) => {
|
||||
if (!chat) {
|
||||
this.log (`unexpectedly got null chat: ${item}, ${chat}`, MessageLogLevel.info)
|
||||
return
|
||||
@@ -133,24 +199,46 @@ export class WAConnection extends Base {
|
||||
this.chats.insert (chat) // chats data (log json to see what it looks like)
|
||||
})
|
||||
|
||||
this.log (`received ${this.chats.all().length} chats`, MessageLogLevel.info)
|
||||
// if there are no chats
|
||||
if (this.chats.all().length === 0) {
|
||||
this.log (`received ${json[2].length} chats`, MessageLogLevel.info)
|
||||
if (json[2].length === 0) {
|
||||
receivedMessages = true
|
||||
resolveTask ()
|
||||
checkForResolution ()
|
||||
}
|
||||
})
|
||||
// get contacts
|
||||
this.registerCallback(['response', 'type:contacts'], json => {
|
||||
if (json[1].duplicate || !json[2]) return
|
||||
|
||||
receivedContacts = true
|
||||
|
||||
json[2].forEach(([type, contact]: ['user', WAContact]) => {
|
||||
if (!contact) return this.log (`unexpectedly got null contact: ${type}, ${contact}`, MessageLogLevel.info)
|
||||
|
||||
contact.jid = Utils.whatsappID (contact.jid)
|
||||
this.contacts[contact.jid] = contact
|
||||
})
|
||||
this.log (`received ${json[2].length} contacts`, MessageLogLevel.info)
|
||||
checkForResolution ()
|
||||
})
|
||||
|
||||
// wait for the chats & contacts to load
|
||||
await Utils.promiseTimeout (timeoutMs, (resolve, reject) => {
|
||||
resolveTask = resolve
|
||||
const rejectTask = (reason) => {
|
||||
reject (new Error(reason))
|
||||
this.off ('close', rejectTask)
|
||||
}
|
||||
this.on ('close', rejectTask)
|
||||
})
|
||||
const {delay, cancel} = Utils.delayCancellable (timeoutMs)
|
||||
|
||||
const waitForChats = Promise.race ([
|
||||
new Promise (resolve => resolveTask = resolve),
|
||||
delay.then (() => { throw TimedOutError() })
|
||||
])
|
||||
.then (() => (
|
||||
this.chats
|
||||
.all ()
|
||||
.forEach (chat => {
|
||||
const respectiveContact = this.contacts[chat.jid]
|
||||
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
|
||||
})
|
||||
))
|
||||
.finally (deregisterCallbacks)
|
||||
|
||||
return { waitForChats, cancelChats: cancel }
|
||||
}
|
||||
private releasePendingRequests () {
|
||||
this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request
|
||||
@@ -221,62 +309,27 @@ export class WAConnection extends Base {
|
||||
}
|
||||
/** Send a keep alive request every X seconds, server updates & responds with last seen */
|
||||
private startKeepAliveRequest() {
|
||||
this.keepAliveReq && clearInterval (this.keepAliveReq)
|
||||
|
||||
this.keepAliveReq = setInterval(() => {
|
||||
const diff = (new Date().getTime() - this.lastSeen.getTime())
|
||||
if (!this.lastSeen) this.lastSeen = new Date ()
|
||||
const diff = new Date().getTime() - this.lastSeen.getTime()
|
||||
/*
|
||||
check if it's been a suspicious amount of time since the server responded with our last seen
|
||||
it could be that the network is down
|
||||
*/
|
||||
if (diff > KEEP_ALIVE_INTERVAL_MS+5000) this.unexpectedDisconnect (DisconnectReason.lost)
|
||||
else this.send ('?,,') // if its all good, send a keep alive request
|
||||
}, KEEP_ALIVE_INTERVAL_MS)
|
||||
}
|
||||
protected async reconnectLoop () {
|
||||
this.cancelledReconnect = false
|
||||
try {
|
||||
while (true) {
|
||||
const {delay, cancel} = Utils.delayCancellable (2500)
|
||||
this.cancelReconnect = () => {
|
||||
this.cancelledReconnect = true
|
||||
this.cancelReconnect = null
|
||||
cancel ()
|
||||
}
|
||||
|
||||
await delay
|
||||
else if (this.conn) this.send ('?,,') // if its all good, send a keep alive request
|
||||
|
||||
try {
|
||||
// if an external connect causes the connection to be open
|
||||
if (this.state === 'open') break
|
||||
|
||||
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced && this.lastDisconnectReason !== DisconnectReason.unknown && this.user
|
||||
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
|
||||
|
||||
await this.connect ({ timeoutMs: 30000, retryOnNetworkErrors: true, reconnectID })
|
||||
this.cancelReconnect = null
|
||||
break
|
||||
} catch (error) {
|
||||
// don't continue reconnecting if error is 401
|
||||
if (error instanceof BaileysError && error.status === 401) {
|
||||
break
|
||||
}
|
||||
this.log (`error in reconnecting: ${error}, reconnecting...`, MessageLogLevel.info)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
|
||||
}
|
||||
}
|
||||
protected registerPhoneConnectionPoll () {
|
||||
this.phoneCheck = setInterval (() => {
|
||||
this.checkPhoneConnection (5000) // 5000 ms for timeout
|
||||
// poll phone connection as well,
|
||||
// 5000 ms for timeout
|
||||
this.checkPhoneConnection (5000)
|
||||
.then (connected => {
|
||||
if (this.phoneConnected != connected) {
|
||||
this.emit ('connection-phone-change', {connected})
|
||||
}
|
||||
this.phoneConnected !== connected && this.emit ('connection-phone-change', {connected})
|
||||
this.phoneConnected = connected
|
||||
})
|
||||
.catch (error => this.log(`error in getting phone connection: ${error}`, MessageLogLevel.info))
|
||||
}, 15000)
|
||||
|
||||
}, KEEP_ALIVE_INTERVAL_MS)
|
||||
}
|
||||
/**
|
||||
* Check if your phone is connected
|
||||
|
||||
Reference in New Issue
Block a user