remove socketEvents, use ws instead for events

This commit is contained in:
Adhiraj Singh
2021-08-21 20:31:18 +05:30
parent 1fdc385307
commit 1a5b638859
5 changed files with 28 additions and 35 deletions

View File

@@ -1,5 +1,4 @@
import BinaryNode from "../BinaryNode"; import BinaryNode from "../BinaryNode";
import { EventEmitter } from 'events'
import { Chat, Contact, Presence, PresenceData, SocketConfig, WAFlag, WAMetric, WABusinessProfile, ChatModification, WAMessageKey, WAMessage, WAMessageUpdate } from "../Types"; import { Chat, Contact, Presence, PresenceData, SocketConfig, WAFlag, WAMetric, WABusinessProfile, ChatModification, WAMessageKey, WAMessage, WAMessageUpdate } from "../Types";
import { debouncedTimeout, unixTimestampSeconds, whatsappID } from "../Utils/generics"; import { debouncedTimeout, unixTimestampSeconds, whatsappID } from "../Utils/generics";
import makeAuthSocket from "./auth"; import makeAuthSocket from "./auth";
@@ -10,7 +9,7 @@ const makeChatsSocket = (config: SocketConfig) => {
const sock = makeAuthSocket(config) const sock = makeAuthSocket(config)
const { const {
ev, ev,
socketEvents, ws: socketEvents,
currentEpoch, currentEpoch,
setQuery, setQuery,
query, query,

View File

@@ -1,5 +1,4 @@
import BinaryNode from "../BinaryNode"; import BinaryNode from "../BinaryNode";
import { EventEmitter } from 'events'
import { SocketConfig, GroupModificationResponse, ParticipantAction, GroupMetadata, WAFlag, WAMetric, WAGroupCreateResponse, GroupParticipant } from "../Types"; import { SocketConfig, GroupModificationResponse, ParticipantAction, GroupMetadata, WAFlag, WAMetric, WAGroupCreateResponse, GroupParticipant } from "../Types";
import { generateMessageID, unixTimestampSeconds, whatsappID } from "../Utils/generics"; import { generateMessageID, unixTimestampSeconds, whatsappID } from "../Utils/generics";
import makeMessagesSocket from "./messages"; import makeMessagesSocket from "./messages";
@@ -9,7 +8,7 @@ const makeGroupsSocket = (config: SocketConfig) => {
const sock = makeMessagesSocket(config) const sock = makeMessagesSocket(config)
const { const {
ev, ev,
socketEvents, ws: socketEvents,
query, query,
generateMessageTag, generateMessageTag,
currentEpoch, currentEpoch,

View File

@@ -1,6 +1,5 @@
import { SocketConfig } from '../Types' import { SocketConfig } from '../Types'
import { DEFAULT_CONNECTION_CONFIG } from '../Defaults' import { DEFAULT_CONNECTION_CONFIG } from '../Defaults'
import { EventEmitter } from 'events'
import _makeConnection from './groups' import _makeConnection from './groups'
// export the last socket layer // export the last socket layer
const makeConnection = (config: Partial<SocketConfig>) => ( const makeConnection = (config: Partial<SocketConfig>) => (

View File

@@ -18,7 +18,7 @@ const makeMessagesSocket = (config: SocketConfig) => {
const sock = makeChatsSocket(config) const sock = makeChatsSocket(config)
const { const {
ev, ev,
socketEvents, ws: socketEvents,
query, query,
generateMessageTag, generateMessageTag,
currentEpoch, currentEpoch,

View File

@@ -1,5 +1,4 @@
import { Boom } from '@hapi/boom' import { Boom } from '@hapi/boom'
import EventEmitter from "events"
import { STATUS_CODES } from "http" import { STATUS_CODES } from "http"
import { promisify } from "util" import { promisify } from "util"
import WebSocket from "ws" import WebSocket from "ws"
@@ -25,8 +24,6 @@ export const makeSocket = ({
expectResponseTimeout, expectResponseTimeout,
phoneConnectionChanged phoneConnectionChanged
}: SocketConfig) => { }: SocketConfig) => {
const socketEvents = new EventEmitter()
socketEvents.setMaxListeners(0)
// for generating tags // for generating tags
const referenceDateSeconds = unixTimestampSeconds(new Date()) const referenceDateSeconds = unixTimestampSeconds(new Date())
const ws = new WebSocket(waWebSocketUrl, undefined, { const ws = new WebSocket(waWebSocketUrl, undefined, {
@@ -99,7 +96,7 @@ export const makeSocket = ({
const end = (error: Error | undefined) => { const end = (error: Error | undefined) => {
logger.debug({ error }, 'connection closed') logger.debug({ error }, 'connection closed')
ws.removeAllListeners('close') ws.removeAllListeners('close')
ws.removeAllListeners('error') ws.removeAllListeners('error')
ws.removeAllListeners('open') ws.removeAllListeners('open')
ws.removeAllListeners('message') ws.removeAllListeners('message')
@@ -111,8 +108,8 @@ export const makeSocket = ({
if(ws.readyState !== ws.CLOSED && ws.readyState !== ws.CLOSING) { if(ws.readyState !== ws.CLOSED && ws.readyState !== ws.CLOSING) {
try { ws.close() } catch { } try { ws.close() } catch { }
} }
ws.emit('ws-close', error) ws.emit('ws-close', error)
// so it cannot be re-emitted
ws.removeAllListeners('ws-close') ws.removeAllListeners('ws-close')
} }
const onMessageRecieved = (message: string | Buffer) => { const onMessageRecieved = (message: string | Buffer) => {
@@ -120,7 +117,7 @@ export const makeSocket = ({
// when the first character in the message is an '!', the server is sending a pong frame // when the first character in the message is an '!', the server is sending a pong frame
const timestamp = message.slice(1, message.length).toString ('utf-8') const timestamp = message.slice(1, message.length).toString ('utf-8')
lastDateRecv = new Date(parseInt(timestamp)) lastDateRecv = new Date(parseInt(timestamp))
socketEvents.emit('received-pong') ws.emit('received-pong')
} else { } else {
let messageTag: string let messageTag: string
let json: any let json: any
@@ -141,19 +138,19 @@ export const makeSocket = ({
let anyTriggered = false let anyTriggered = false
/* Check if this is a response to a message we sent */ /* Check if this is a response to a message we sent */
anyTriggered = socketEvents.emit(`${DEF_TAG_PREFIX}${messageTag}`, json) anyTriggered = ws.emit(`${DEF_TAG_PREFIX}${messageTag}`, json)
/* Check if this is a response to a message we are expecting */ /* Check if this is a response to a message we are expecting */
const l0 = json.header || json[0] || '' const l0 = json.header || json[0] || ''
const l1 = json?.attributes || json?.[1] || { } const l1 = json?.attributes || json?.[1] || { }
const l2 = json?.data?.[0]?.header || json[2]?.[0] || '' const l2 = json?.data?.[0]?.header || json[2]?.[0] || ''
Object.keys(l1).forEach(key => { Object.keys(l1).forEach(key => {
anyTriggered = socketEvents.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]},${l2}`, json) || anyTriggered anyTriggered = ws.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]},${l2}`, json) || anyTriggered
anyTriggered = socketEvents.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]}`, json) || anyTriggered anyTriggered = ws.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}:${l1[key]}`, json) || anyTriggered
anyTriggered = socketEvents.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}`, json) || anyTriggered anyTriggered = ws.emit(`${DEF_CALLBACK_PREFIX}${l0},${key}`, json) || anyTriggered
}) })
anyTriggered = socketEvents.emit(`${DEF_CALLBACK_PREFIX}${l0},,${l2}`, json) || anyTriggered anyTriggered = ws.emit(`${DEF_CALLBACK_PREFIX}${l0},,${l2}`, json) || anyTriggered
anyTriggered = socketEvents.emit(`${DEF_CALLBACK_PREFIX}${l0}`, json) || anyTriggered anyTriggered = ws.emit(`${DEF_CALLBACK_PREFIX}${l0}`, json) || anyTriggered
if (!anyTriggered && logger.level === 'debug') { if (!anyTriggered && logger.level === 'debug') {
logger.debug({ unhandled: true, tag: messageTag, fromMe: false, json }, 'communication recv') logger.debug({ unhandled: true, tag: messageTag, fromMe: false, json }, 'communication recv')
@@ -170,12 +167,12 @@ export const makeSocket = ({
logger.info({ tag }, `cancelling wait for message as a response is no longer expected from the phone`) logger.info({ tag }, `cancelling wait for message as a response is no longer expected from the phone`)
cancel(new Boom('Not expecting a response', { statusCode: 422 })) cancel(new Boom('Not expecting a response', { statusCode: 422 }))
}, expectResponseTimeout) }, expectResponseTimeout)
socketEvents.off(PHONE_CONNECTION_CB, listener) ws.off(PHONE_CONNECTION_CB, listener)
} }
} }
socketEvents.on(PHONE_CONNECTION_CB, listener) ws.on(PHONE_CONNECTION_CB, listener)
return () => { return () => {
socketEvents.off(PHONE_CONNECTION_CB, listener) ws.off(PHONE_CONNECTION_CB, listener)
timeout && clearTimeout(timeout) timeout && clearTimeout(timeout)
} }
} }
@@ -216,7 +213,7 @@ export const makeSocket = ({
let onRecv: (json) => void let onRecv: (json) => void
let onErr: (err) => void let onErr: (err) => void
let cancelPhoneChecker: () => void let cancelPhoneChecker: () => void
if (requiresPhoneConnection) { if(requiresPhoneConnection) {
startPhoneCheckInterval() startPhoneCheckInterval()
cancelPhoneChecker = exitQueryIfResponseNotExpected(tag, onErr) cancelPhoneChecker = exitQueryIfResponseNotExpected(tag, onErr)
} }
@@ -224,11 +221,12 @@ export const makeSocket = ({
const result = await promiseTimeout(timeoutMs, const result = await promiseTimeout(timeoutMs,
(resolve, reject) => { (resolve, reject) => {
onRecv = resolve onRecv = resolve
onErr = err => reject(err || new Boom('Connection Closed', { statusCode: 429 })) onErr = err => {
reject(err || new Boom('Connection Closed', { statusCode: 429 }))
}
socketEvents.on(`TAG:${tag}`, onRecv) ws.on(`TAG:${tag}`, onRecv)
ws.on('close', onErr) // if the socket closes, you'll never receive the message ws.on('ws-close', onErr) // if the socket closes, you'll never receive the message
ws.on('error', onErr)
}, },
) )
return result as any return result as any
@@ -236,9 +234,8 @@ export const makeSocket = ({
requiresPhoneConnection && clearPhoneCheckInterval() requiresPhoneConnection && clearPhoneCheckInterval()
cancelPhoneChecker && cancelPhoneChecker() cancelPhoneChecker && cancelPhoneChecker()
socketEvents.off(`TAG:${tag}`, onRecv) ws.off(`TAG:${tag}`, onRecv)
ws.off('close', onErr) // if the socket closes, you'll never receive the message ws.off('ws-close', onErr) // if the socket closes, you'll never receive the message
ws.off('error', onErr)
} }
} }
/** /**
@@ -303,9 +300,9 @@ export const makeSocket = ({
ws.on('error', onClose) ws.on('error', onClose)
}) })
.finally(() => { .finally(() => {
socketEvents.off('open', onOpen) ws.off('open', onOpen)
socketEvents.off('close', onClose) ws.off('close', onClose)
socketEvents.off('error', onClose) ws.off('error', onClose)
}) })
} }
@@ -317,7 +314,7 @@ export const makeSocket = ({
ws.on('error', end) ws.on('error', end)
ws.on('close', () => end(new Boom('Connection Terminated', { statusCode: DisconnectReason.connectionLost }))) ws.on('close', () => end(new Boom('Connection Terminated', { statusCode: DisconnectReason.connectionLost })))
socketEvents.on(PHONE_CONNECTION_CB, json => { ws.on(PHONE_CONNECTION_CB, json => {
if (!json[1]) { if (!json[1]) {
end(new Boom('Connection terminated by phone', { statusCode: DisconnectReason.connectionLost })) end(new Boom('Connection terminated by phone', { statusCode: DisconnectReason.connectionLost }))
logger.info('Connection terminated by phone, closing...') logger.info('Connection terminated by phone, closing...')
@@ -325,7 +322,7 @@ export const makeSocket = ({
phoneConnectionChanged(true) phoneConnectionChanged(true)
} }
}) })
socketEvents.on('CB:Cmd,type:disconnect', json => { ws.on('CB:Cmd,type:disconnect', json => {
const {kind} = json[1] const {kind} = json[1]
let reason: DisconnectReason let reason: DisconnectReason
switch(kind) { switch(kind) {
@@ -343,7 +340,6 @@ export const makeSocket = ({
}) })
return { return {
socketEvents,
ws, ws,
updateKeys: (info: { encKey: Buffer, macKey: Buffer }) => authInfo = info, updateKeys: (info: { encKey: Buffer, macKey: Buffer }) => authInfo = info,
waitForSocketOpen, waitForSocketOpen,