This commit is contained in:
Adhiraj
2020-09-05 18:31:56 +05:30
parent eebe34efa5
commit 9041272b5c
5 changed files with 47 additions and 85 deletions

View File

@@ -94,6 +94,10 @@ describe ('Reconnects', () => {
reason !== DisconnectReason.intentional && assert.fail ('should not have closed again') reason !== DisconnectReason.intentional && assert.fail ('should not have closed again')
)) ))
await delay (60*1000) await delay (60*1000)
const status = await conn.getStatus ()
assert.ok (status)
conn.close () conn.close ()
} }
/** /**
@@ -252,7 +256,7 @@ describe ('Pending Requests', () => {
}) })
it('should queue requests when closed', async () => { it('should queue requests when closed', async () => {
const conn = new WAConnection () const conn = new WAConnection ()
conn.pendingRequestTimeoutMs = null //conn.pendingRequestTimeoutMs = null
await conn.loadAuthInfo('./auth_info.json').connect () await conn.loadAuthInfo('./auth_info.json').connect ()

View File

@@ -221,7 +221,7 @@ export class WAConnection extends EventEmitter {
*/ */
async query({json, binaryTags, tag, timeoutMs, expect200, waitForOpen}: WAQuery) { async query({json, binaryTags, tag, timeoutMs, expect200, waitForOpen}: WAQuery) {
waitForOpen = typeof waitForOpen === 'undefined' ? true : waitForOpen waitForOpen = typeof waitForOpen === 'undefined' ? true : waitForOpen
await this.waitForConnection (waitForOpen) if (waitForOpen) await this.waitForConnection ()
if (binaryTags) tag = await this.sendBinary(json as WANode, binaryTags, tag) if (binaryTags) tag = await this.sendBinary(json as WANode, binaryTags, tag)
else tag = await this.sendJSON(json, tag) else tag = await this.sendJSON(json, tag)
@@ -276,15 +276,12 @@ export class WAConnection extends EventEmitter {
this.msgCount += 1 // increment message count, it makes the 'epoch' field when sending binary messages this.msgCount += 1 // increment message count, it makes the 'epoch' field when sending binary messages
return this.conn.send(m) return this.conn.send(m)
} }
protected async waitForConnection (waitForOpen: boolean=true) { protected async waitForConnection () {
if (!waitForOpen || this.state === 'open') return if (this.state === 'open') return
const timeout = this.pendingRequestTimeoutMs await Utils.promiseTimeout (
try { this.pendingRequestTimeoutMs,
await Utils.promiseTimeout (timeout, (resolve, reject) => this.pendingRequests.push({resolve, reject})) (resolve, reject) => this.pendingRequests.push({resolve, reject}))
} catch {
throw new Error('cannot send message, disconnected from WhatsApp')
}
} }
/** /**
* Disconnect from the phone. Your auth credentials become invalid after sending a disconnect request. * Disconnect from the phone. Your auth credentials become invalid after sending a disconnect request.
@@ -318,7 +315,7 @@ export class WAConnection extends EventEmitter {
this.endConnection () this.endConnection ()
if (reason === 'invalid_session' || reason === 'intentional') { if (reason === 'invalid_session' || reason === 'intentional') {
this.pendingRequests.forEach (({reject}) => reject(new Error('close'))) this.pendingRequests.forEach (({reject}) => reject(new Error(reason)))
this.pendingRequests = [] this.pendingRequests = []
} }
// reconnecting if the timeout is active for the reconnect loop // reconnecting if the timeout is active for the reconnect loop
@@ -327,7 +324,7 @@ export class WAConnection extends EventEmitter {
protected endConnection () { protected endConnection () {
this.conn?.removeAllListeners ('close') this.conn?.removeAllListeners ('close')
this.conn?.removeAllListeners ('message') this.conn?.removeAllListeners ('message')
this.conn?.close () //this.conn?.close ()
this.conn?.terminate() this.conn?.terminate()
this.conn = null this.conn = null
this.lastSeen = null this.lastSeen = null

View File

@@ -2,6 +2,7 @@ import * as Utils from './Utils'
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError, WAOpenResult } from './Constants' import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError, WAOpenResult } from './Constants'
import {WAConnection as Base} from './1.Validation' import {WAConnection as Base} from './1.Validation'
import Decoder from '../Binary/Decoder' import Decoder from '../Binary/Decoder'
import WS from 'ws'
export class WAConnection extends Base { export class WAConnection extends Base {
/** Connect to WhatsApp Web */ /** Connect to WhatsApp Web */
@@ -62,48 +63,44 @@ export class WAConnection extends Base {
// actual connect // actual connect
const connect = () => { const connect = () => {
const timeoutMs = options?.timeoutMs || 60*1000 const timeoutMs = options?.timeoutMs || 60*1000
let task = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
const { ws, cancel } = Utils.openWebSocketConnection (5000, false) // determine whether reconnect should be used or not
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced &&
this.lastDisconnectReason !== DisconnectReason.unknown &&
this.lastDisconnectReason !== DisconnectReason.intentional &&
this.user?.jid
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
let task: Promise<void | { [k: string]: Partial<WAChat> }> = ws this.conn = new WS('wss://web.whatsapp.com/ws', null, { origin: 'https://web.whatsapp.com', timeout: timeoutMs })
.then (conn => this.conn = conn) this.conn.on('message', data => this.onMessageRecieved(data as any))
this.conn.on ('open', () => {
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
this.authenticate(reconnectID)
.then (resolve)
.then (() => ( .then (() => (
this.conn.on('message', data => this.onMessageRecieved(data as any))
))
.then (() => (
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
))
.then (() => this.authenticate(reconnectID))
.then (() => {
this.conn this.conn
.removeAllListeners ('error') .removeAllListeners ('error')
.removeAllListeners ('close') .removeAllListeners ('close')
}) ))
.catch (reject)
})
this.conn.on('error', reject)
this.conn.on('close', () => reject(new Error('close')))
})
let cancelTask: () => void let cancel: () => void
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) { if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const {waitForChats, cancelChats} = this.receiveChatsAndContacts() const {waitForChats, cancelChats} = this.receiveChatsAndContacts()
task = Promise.all ([task, waitForChats]).then (([_, updates]) => updates) task = Promise.all ([task, waitForChats]).then (([_, updates]) => updates)
cancelTask = () => { cancelChats(); cancel() } cancel = cancelChats
} else cancelTask = cancel }
task = task as Promise<void | { [k: string]: Partial<WAChat> }>
// determine whether reconnect should be used or not return { promise: task, cancel }
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced &&
this.lastDisconnectReason !== DisconnectReason.unknown &&
this.lastDisconnectReason !== DisconnectReason.intentional &&
this.user?.jid
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => (
task.then (resolve).catch (reject)
))
.catch (err => {
this.endConnection ()
throw err
}) as Promise<void | { [k: string]: Partial<WAChat> }>
return { promise, cancel: cancelTask }
} }
let promise = Promise.resolve () let promise = Promise.resolve ()
@@ -127,6 +124,9 @@ export class WAConnection extends Base {
const final = await result.promise const final = await result.promise
return final return final
} catch (error) {
this.endConnection ()
throw error
} finally { } finally {
cancel() cancel()
this.off('close', cancel) this.off('close', cancel)

View File

@@ -31,10 +31,10 @@ export class WAConnection extends Base {
} }
) as Promise<{status: number}> ) as Promise<{status: number}>
/** Request an update on the presence of a user */ /** Request an update on the presence of a user */
requestPresenceUpdate = async (jid: string) => this.query({json: ['action', 'presence', 'subscribe', jid]}) requestPresenceUpdate = async (jid: string) => this.query({ json: ['action', 'presence', 'subscribe', jid] })
/** Query the status of the person (see groupMetadata() for groups) */ /** Query the status of the person (see groupMetadata() for groups) */
async getStatus (jid?: string) { async getStatus (jid?: string) {
const status: { status: string } = await this.query({json: ['query', 'Status', jid || this.user.jid], expect200: true}) const status: { status: string } = await this.query({ json: ['query', 'Status', jid || this.user.jid] })
return status return status
} }
async setStatus (status: string) { async setStatus (status: string) {

View File

@@ -5,7 +5,6 @@ import {promises as fs} from 'fs'
import fetch from 'node-fetch' import fetch from 'node-fetch'
import { exec } from 'child_process' import { exec } from 'child_process'
import {platform, release} from 'os' import {platform, release} from 'os'
import WS from 'ws'
import Decoder from '../Binary/Decoder' import Decoder from '../Binary/Decoder'
import { MessageType, HKDFInfoKeys, MessageOptions, WAChat, WAMessageContent, BaileysError, WAMessageProto, TimedOutError, CancelledError } from './Constants' import { MessageType, HKDFInfoKeys, MessageOptions, WAChat, WAMessageContent, BaileysError, WAMessageProto, TimedOutError, CancelledError } from './Constants'
@@ -115,44 +114,6 @@ export async function promiseTimeout<T>(ms: number, promise: (resolve: (v?: T)=>
.finally (cancel) .finally (cancel)
return p as Promise<T> return p as Promise<T>
} }
export const openWebSocketConnection = (timeoutMs: number, retryOnNetworkError: boolean) => {
const newWS = async () => {
const conn = new WS('wss://web.whatsapp.com/ws', null, { origin: 'https://web.whatsapp.com', timeout: timeoutMs })
await new Promise ((resolve, reject) => {
conn.on('open', () => {
conn.removeAllListeners ('error')
conn.removeAllListeners ('close')
conn.removeAllListeners ('open')
resolve ()
})
// if there was an error in the WebSocket
conn.on('error', reject)
conn.on('close', () => reject(new Error('close')))
})
return conn
}
let cancelled = false
const connect = async () => {
while (!cancelled) {
try {
const ws = await newWS()
if (cancelled) {
ws.terminate ()
break
} else return ws
} catch (error) {
if (!retryOnNetworkError) throw error
await delay (1000)
}
}
throw CancelledError()
}
const cancel = () => cancelled = true
return { ws: connect(), cancel }
}
// whatsapp requires a message tag for every message, we just use the timestamp as one // whatsapp requires a message tag for every message, we just use the timestamp as one
export function generateMessageTag(epoch?: number) { export function generateMessageTag(epoch?: number) {
let tag = unixTimestampSeconds().toString() let tag = unixTimestampSeconds().toString()