refactor: process sync actions out of socket

This commit is contained in:
Adhiraj Singh
2022-03-29 14:16:51 +05:30
parent ab9b232838
commit ec6e904db4
4 changed files with 87 additions and 72 deletions

View File

@@ -1,7 +1,7 @@
import { Boom } from '@hapi/boom' import { Boom } from '@hapi/boom'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { AppStateChunk, Chat, ChatModification, ChatMutation, Contact, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' import { AppStateChunk, ChatModification, ChatMutation, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, toNumber } from '../Utils' import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils'
import { makeMutex } from '../Utils/make-mutex' import { makeMutex } from '../Utils/make-mutex'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
import { makeMessagesSocket } from './messages-send' import { makeMessagesSocket } from './messages-send'
@@ -20,6 +20,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
query, query,
fetchPrivacySettings, fetchPrivacySettings,
onUnexpectedError, onUnexpectedError,
emitEventsFromMap,
} = sock } = sock
const mutationMutex = makeMutex() const mutationMutex = makeMutex()
@@ -338,7 +339,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
) )
processSyncActions(appStateChunk.totalMutations) processSyncActionsLocal(appStateChunk.totalMutations)
return appStateChunk return appStateChunk
} }
@@ -421,8 +422,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
type = 'available' type = 'available'
} }
if (firstChild.attrs?.media === 'audio'){ if(firstChild.attrs?.media === 'audio') {
type = 'recording'; type = 'recording'
} }
presence = { lastKnownPresence: type } presence = { lastKnownPresence: type }
@@ -454,64 +455,9 @@ export const makeChatsSocket = (config: SocketConfig) => {
) )
} }
const processSyncActions = (actions: ChatMutation[]) => { const processSyncActionsLocal = (actions: ChatMutation[]) => {
const updates: { [jid: string]: Partial<Chat> } = {} const events = processSyncActions(actions, authState.creds.me!, logger)
const contactUpdates: { [jid: string]: Contact } = {} emitEventsFromMap(events)
const msgDeletes: proto.IMessageKey[] = []
for(const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } of actions) {
const update: Partial<Chat> = { id }
if(action?.muteAction) {
update.mute = action.muteAction?.muted ?
toNumber(action.muteAction!.muteEndTimestamp!) :
undefined
} else if(action?.archiveChatAction) {
update.archive = !!action.archiveChatAction?.archived
} else if(action?.markChatAsReadAction) {
update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1
} else if(action?.clearChatAction) {
msgDeletes.push({
remoteJid: id,
id: msgId,
fromMe: fromMe === '1'
})
} else if(action?.contactAction) {
contactUpdates[id] = {
...(contactUpdates[id] || {}),
id,
name: action.contactAction!.fullName
}
} else if(action?.pushNameSetting) {
const me = {
...authState.creds.me!,
name: action?.pushNameSetting?.name!
}
ev.emit('creds.update', { me })
} else if(action?.pinAction) {
update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : undefined
} else {
logger.warn({ action, id }, 'unprocessable update')
}
if(Object.keys(update).length > 1) {
updates[update.id] = {
...(updates[update.id] || {}),
...update
}
}
}
if(Object.values(updates).length) {
ev.emit('chats.update', Object.values(updates))
}
if(Object.values(contactUpdates).length) {
ev.emit('contacts.upsert', Object.values(contactUpdates))
}
if(msgDeletes.length) {
ev.emit('messages.delete', { keys: msgDeletes })
}
} }
const appPatch = async(patchCreate: WAPatchCreate) => { const appPatch = async(patchCreate: WAPatchCreate) => {
@@ -574,7 +520,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
if(config.emitOwnEvents) { if(config.emitOwnEvents) {
const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, getAppStateSyncKey) const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, getAppStateSyncKey)
processSyncActions(result.newMutations) processSyncActionsLocal(result.newMutations)
} }
} }
) )

View File

@@ -27,6 +27,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
relayMessage, relayMessage,
sendReceipt, sendReceipt,
resyncMainAppState, resyncMainAppState,
emitEventsFromMap,
} = sock } = sock
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
@@ -131,12 +132,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}) })
} }
const emitEventsFromMap = (map: Partial<BaileysEventMap<any>>) => {
for(const key in map) {
ev.emit(key as any, map[key])
}
}
const processMessageLocal = async(message: proto.IWebMessageInfo) => { const processMessageLocal = async(message: proto.IWebMessageInfo) => {
const meId = authState.creds.me!.id const meId = authState.creds.me!.id
// send ack for history message // send ack for history message

View File

@@ -5,7 +5,7 @@ import { promisify } from 'util'
import WebSocket from 'ws' import WebSocket from 'ws'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, KEY_BUNDLE_TYPE } from '../Defaults' import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, KEY_BUNDLE_TYPE } from '../Defaults'
import { AuthenticationCreds, BaileysEventEmitter, DisconnectReason, SocketConfig } from '../Types' import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types'
import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, encodeBigEndian, generateLoginNode, generateOrGetPreKeys, generateRegistrationNode, getPreKeys, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout, useSingleFileAuthState, xmppPreKey, xmppSignedPreKey } from '../Utils' import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, encodeBigEndian, generateLoginNode, generateOrGetPreKeys, generateRegistrationNode, getPreKeys, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout, useSingleFileAuthState, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, S_WHATSAPP_NET } from '../WABinary' import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, S_WHATSAPP_NET } from '../WABinary'
@@ -408,6 +408,13 @@ export const makeSocket = ({
] ]
}) })
) )
const emitEventsFromMap = (map: Partial<BaileysEventMap<AuthenticationCreds>>) => {
for(const key in map) {
ev.emit(key as any, map[key])
}
}
/** logout & invalidate connection */ /** logout & invalidate connection */
const logout = async() => { const logout = async() => {
const jid = authState.creds.me?.id const jid = authState.creds.me?.id
@@ -584,6 +591,7 @@ export const makeSocket = ({
get user() { get user() {
return authState.creds.me return authState.creds.me
}, },
emitEventsFromMap,
assertingPreKeys, assertingPreKeys,
generateMessageTag, generateMessageTag,
query, query,

View File

@@ -1,6 +1,7 @@
import { Boom } from '@hapi/boom' import { Boom } from '@hapi/boom'
import type { Logger } from 'pino'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { ChatModification, ChatMutation, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types' import { AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren } from '../WABinary' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren } from '../WABinary'
import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto' import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto'
import { toNumber } from './generics' import { toNumber } from './generics'
@@ -539,3 +540,68 @@ export const chatModificationToAppPatch = (
return patch return patch
} }
export const processSyncActions = (
actions: ChatMutation[],
me: Contact,
logger?: Logger
) => {
const map: Partial<BaileysEventMap<AuthenticationCreds>> = { }
const updates: { [jid: string]: Partial<Chat> } = {}
const contactUpdates: { [jid: string]: Contact } = {}
const msgDeletes: proto.IMessageKey[] = []
for(const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } of actions) {
const update: Partial<Chat> = { id }
if(action?.muteAction) {
update.mute = action.muteAction?.muted ?
toNumber(action.muteAction!.muteEndTimestamp!) :
undefined
} else if(action?.archiveChatAction) {
update.archive = !!action.archiveChatAction?.archived
} else if(action?.markChatAsReadAction) {
update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1
} else if(action?.clearChatAction) {
msgDeletes.push({
remoteJid: id,
id: msgId,
fromMe: fromMe === '1'
})
} else if(action?.contactAction) {
contactUpdates[id] = {
...(contactUpdates[id] || {}),
id,
name: action.contactAction!.fullName
}
} else if(action?.pushNameSetting) {
map['creds.update'] = {
me: { ...me, name: action?.pushNameSetting?.name! }
}
} else if(action?.pinAction) {
update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : undefined
} else {
logger.warn({ action, id }, 'unprocessable update')
}
if(Object.keys(update).length > 1) {
updates[update.id] = {
...(updates[update.id] || {}),
...update
}
}
}
if(Object.values(updates).length) {
map['chats.update'] = Object.values(updates)
}
if(Object.values(contactUpdates).length) {
map['contacts.upsert'] = Object.values(contactUpdates)
}
if(msgDeletes.length) {
map['messages.delete'] = { keys: msgDeletes }
}
return map
}