From ec6e904db47e1e0d0b4acea6af66defd5d518e21 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Tue, 29 Mar 2022 14:16:51 +0530 Subject: [PATCH] refactor: process sync actions out of socket --- src/Socket/chats.ts | 74 +++++-------------------------------- src/Socket/messages-recv.ts | 7 +--- src/Socket/socket.ts | 10 ++++- src/Utils/chat-utils.ts | 68 +++++++++++++++++++++++++++++++++- 4 files changed, 87 insertions(+), 72 deletions(-) diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 54a25a9..342c1b3 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' -import { AppStateChunk, Chat, ChatModification, ChatMutation, Contact, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' -import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, toNumber } from '../Utils' +import { AppStateChunk, ChatModification, ChatMutation, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' +import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils' import { makeMutex } from '../Utils/make-mutex' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { makeMessagesSocket } from './messages-send' @@ -20,6 +20,7 @@ export const makeChatsSocket = (config: SocketConfig) => { query, fetchPrivacySettings, onUnexpectedError, + emitEventsFromMap, } = sock const mutationMutex = makeMutex() @@ -338,7 +339,7 @@ export const makeChatsSocket = (config: SocketConfig) => { } ) - processSyncActions(appStateChunk.totalMutations) + processSyncActionsLocal(appStateChunk.totalMutations) return appStateChunk } @@ -421,8 +422,8 @@ export const makeChatsSocket = (config: SocketConfig) => { type = 'available' } - if (firstChild.attrs?.media === 'audio'){ - type = 'recording'; + if(firstChild.attrs?.media === 'audio') { + type = 'recording' } presence = { lastKnownPresence: type } @@ -454,64 +455,9 @@ export const makeChatsSocket = (config: SocketConfig) => { ) } - const processSyncActions = (actions: ChatMutation[]) => { - const updates: { [jid: string]: Partial } = {} - const contactUpdates: { [jid: string]: Contact } = {} - const msgDeletes: proto.IMessageKey[] = [] - - for(const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } of actions) { - const update: Partial = { id } - if(action?.muteAction) { - update.mute = action.muteAction?.muted ? - toNumber(action.muteAction!.muteEndTimestamp!) : - undefined - } else if(action?.archiveChatAction) { - update.archive = !!action.archiveChatAction?.archived - } else if(action?.markChatAsReadAction) { - update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1 - } else if(action?.clearChatAction) { - msgDeletes.push({ - remoteJid: id, - id: msgId, - fromMe: fromMe === '1' - }) - } else if(action?.contactAction) { - contactUpdates[id] = { - ...(contactUpdates[id] || {}), - id, - name: action.contactAction!.fullName - } - } else if(action?.pushNameSetting) { - const me = { - ...authState.creds.me!, - name: action?.pushNameSetting?.name! - } - ev.emit('creds.update', { me }) - } else if(action?.pinAction) { - update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : undefined - } else { - logger.warn({ action, id }, 'unprocessable update') - } - - if(Object.keys(update).length > 1) { - updates[update.id] = { - ...(updates[update.id] || {}), - ...update - } - } - } - - if(Object.values(updates).length) { - ev.emit('chats.update', Object.values(updates)) - } - - if(Object.values(contactUpdates).length) { - ev.emit('contacts.upsert', Object.values(contactUpdates)) - } - - if(msgDeletes.length) { - ev.emit('messages.delete', { keys: msgDeletes }) - } + const processSyncActionsLocal = (actions: ChatMutation[]) => { + const events = processSyncActions(actions, authState.creds.me!, logger) + emitEventsFromMap(events) } const appPatch = async(patchCreate: WAPatchCreate) => { @@ -574,7 +520,7 @@ export const makeChatsSocket = (config: SocketConfig) => { if(config.emitOwnEvents) { const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, getAppStateSyncKey) - processSyncActions(result.newMutations) + processSyncActionsLocal(result.newMutations) } } ) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 1cbd697..4bf9239 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -27,6 +27,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { relayMessage, sendReceipt, resyncMainAppState, + emitEventsFromMap, } = sock /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ @@ -131,12 +132,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { }) } - const emitEventsFromMap = (map: Partial>) => { - for(const key in map) { - ev.emit(key as any, map[key]) - } - } - const processMessageLocal = async(message: proto.IWebMessageInfo) => { const meId = authState.creds.me!.id // send ack for history message diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 4ac9a54..63e671d 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -5,7 +5,7 @@ import { promisify } from 'util' import WebSocket from 'ws' import { proto } from '../../WAProto' import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, KEY_BUNDLE_TYPE } from '../Defaults' -import { AuthenticationCreds, BaileysEventEmitter, DisconnectReason, SocketConfig } from '../Types' +import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types' import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, encodeBigEndian, generateLoginNode, generateOrGetPreKeys, generateRegistrationNode, getPreKeys, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout, useSingleFileAuthState, xmppPreKey, xmppSignedPreKey } from '../Utils' import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, S_WHATSAPP_NET } from '../WABinary' @@ -408,6 +408,13 @@ export const makeSocket = ({ ] }) ) + + const emitEventsFromMap = (map: Partial>) => { + for(const key in map) { + ev.emit(key as any, map[key]) + } + } + /** logout & invalidate connection */ const logout = async() => { const jid = authState.creds.me?.id @@ -584,6 +591,7 @@ export const makeSocket = ({ get user() { return authState.creds.me }, + emitEventsFromMap, assertingPreKeys, generateMessageTag, query, diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 0b6b521..9d68683 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -1,6 +1,7 @@ import { Boom } from '@hapi/boom' +import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { ChatModification, ChatMutation, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types' +import { AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren } from '../WABinary' import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto' import { toNumber } from './generics' @@ -538,4 +539,69 @@ export const chatModificationToAppPatch = ( patch.syncAction.timestamp = Date.now() return patch +} + +export const processSyncActions = ( + actions: ChatMutation[], + me: Contact, + logger?: Logger +) => { + const map: Partial> = { } + const updates: { [jid: string]: Partial } = {} + const contactUpdates: { [jid: string]: Contact } = {} + const msgDeletes: proto.IMessageKey[] = [] + + for(const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } of actions) { + const update: Partial = { id } + if(action?.muteAction) { + update.mute = action.muteAction?.muted ? + toNumber(action.muteAction!.muteEndTimestamp!) : + undefined + } else if(action?.archiveChatAction) { + update.archive = !!action.archiveChatAction?.archived + } else if(action?.markChatAsReadAction) { + update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1 + } else if(action?.clearChatAction) { + msgDeletes.push({ + remoteJid: id, + id: msgId, + fromMe: fromMe === '1' + }) + } else if(action?.contactAction) { + contactUpdates[id] = { + ...(contactUpdates[id] || {}), + id, + name: action.contactAction!.fullName + } + } else if(action?.pushNameSetting) { + map['creds.update'] = { + me: { ...me, name: action?.pushNameSetting?.name! } + } + } else if(action?.pinAction) { + update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : undefined + } else { + logger.warn({ action, id }, 'unprocessable update') + } + + if(Object.keys(update).length > 1) { + updates[update.id] = { + ...(updates[update.id] || {}), + ...update + } + } + } + + if(Object.values(updates).length) { + map['chats.update'] = Object.values(updates) + } + + if(Object.values(contactUpdates).length) { + map['contacts.upsert'] = Object.values(contactUpdates) + } + + if(msgDeletes.length) { + map['messages.delete'] = { keys: msgDeletes } + } + + return map } \ No newline at end of file