perf: avoid excess memory usage when syncing state

This commit is contained in:
Adhiraj Singh
2022-06-11 20:25:57 +05:30
parent a0548fbc4c
commit f87f89329b
3 changed files with 237 additions and 125 deletions

View File

@@ -1,7 +1,7 @@
import { Boom } from '@hapi/boom'
import { proto } from '../../WAProto'
import { ALL_WA_PATCH_NAMES, AppStateChunk, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
import { makeMessagesSocket } from './messages-send'
@@ -228,8 +228,24 @@ export const makeChatsSocket = (config: SocketConfig) => {
})
}
const resyncAppState = async(collections: readonly WAPatchName[], ctx: InitialReceivedChatsState | undefined) => {
const appStateChunk: AppStateChunk = { totalMutations: [], collectionsToHandle: [] }
const newAppStateChunkHandler = (collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => {
const appStateChunk = newAppStateChunk(collections)
return {
appStateChunk,
onMutation(mutation: ChatMutation) {
processSyncAction(
mutation,
appStateChunk.updates,
authState.creds.me,
recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined,
logger
)
}
}
}
const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => {
const { appStateChunk, onMutation } = newAppStateChunkHandler(collections, recvChats)
// we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire
const initialVersionMap: { [T in WAPatchName]?: number } = { }
@@ -295,22 +311,19 @@ export const makeChatsSocket = (config: SocketConfig) => {
const { patches, hasMorePatches, snapshot } = decoded[name]
try {
if(snapshot) {
const { state: newState, mutations } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name])
const { state: newState } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name], onMutation)
states[name] = newState
logger.info(
{ mutations: logger.level === 'trace' ? mutations : undefined },
`restored state of ${name} from snapshot to v${newState.version} with ${mutations.length} mutations`
`restored state of ${name} from snapshot to v${newState.version} with mutations`
)
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
appStateChunk.totalMutations.push(...mutations)
}
// only process if there are syncd patches
if(patches.length) {
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, initialVersionMap[name])
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, onMutation, initialVersionMap[name])
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
@@ -318,8 +331,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
if(newMutations.length) {
logger.trace({ newMutations, name }, 'recv new mutations')
}
appStateChunk.totalMutations.push(...newMutations)
}
if(hasMorePatches) {
@@ -328,13 +339,15 @@ export const makeChatsSocket = (config: SocketConfig) => {
collectionsToHandle.delete(name)
}
} catch(error) {
logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch')
// if retry attempts overshoot
// or key not found
const isIrrecoverableError = attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404
logger.info({ name, error: error.stack }, `failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}`)
await authState.keys.set({ 'app-state-sync-version': { [name]: null } })
// increment number of retries
attemptsMap[name] = (attemptsMap[name] || 0) + 1
// if retry attempts overshoot
// or key not found
if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404) {
if(isIrrecoverableError) {
// stop retrying
collectionsToHandle.delete(name)
}
@@ -344,7 +357,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
)
processSyncActionsLocal(appStateChunk.totalMutations, ctx)
processSyncActionsLocal(appStateChunk.updates)
return appStateChunk
}
@@ -459,16 +472,10 @@ export const makeChatsSocket = (config: SocketConfig) => {
)
}
const processSyncActionsLocal = (actions: ChatMutation[], recvChats: InitialReceivedChatsState | undefined) => {
const events = processSyncActions(
actions,
authState.creds.me!,
recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined,
logger
)
emitEventsFromMap(events)
const processSyncActionsLocal = (actions: SyncActionUpdates) => {
emitEventsFromMap(syncActionUpdatesToEventMap(actions))
// resend available presence to update name on servers
if(events['creds.update']?.me?.name && markOnlineOnConnect) {
if(actions.credsUpdates.me?.name && markOnlineOnConnect) {
sendPresenceUpdate('available')
}
}
@@ -542,8 +549,17 @@ export const makeChatsSocket = (config: SocketConfig) => {
)
if(config.emitOwnEvents) {
const result = await decodePatches(name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], initial, getAppStateSyncKey)
processSyncActionsLocal(result.newMutations, undefined)
const { appStateChunk, onMutation } = newAppStateChunkHandler([name], undefined)
await decodePatches(
name,
[{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }],
initial,
getAppStateSyncKey,
onMutation,
undefined,
logger,
)
processSyncActionsLocal(appStateChunk.updates)
}
}

View File

@@ -1,6 +1,7 @@
import type { proto } from '../../WAProto'
import type { AccountSettings } from './Auth'
import type { MinimalMessage } from './Message'
import type { AccountSettings, AuthenticationCreds } from './Auth'
import { Contact } from './Contact'
import type { MinimalMessage, WAMessageUpdate } from './Message'
/** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */
export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused'
@@ -25,7 +26,19 @@ export type ChatMutation = {
index: string[]
}
export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] }
export type SyncActionUpdates = {
credsUpdates: Partial<AuthenticationCreds>
chatUpdates: { [jid: string]: Partial<Chat> }
chatDeletes: string[]
contactUpserts: { [jid: string]: Contact }
msgUpdates: { [jid: string]: WAMessageUpdate }
msgDeletes: proto.IMessageKey[]
}
export type AppStateChunk = {
updates: SyncActionUpdates
collectionsToHandle: WAPatchName[]
}
export type WAPatchCreate = {
syncAction: proto.ISyncActionValue
@@ -76,7 +89,6 @@ export type ChatModification =
} |
{ delete: true, lastMessages: LastMessageList }
export type InitialReceivedChatsState = {
[jid: string]: { lastMsgRecvTimestamp: number }
}

View File

@@ -1,7 +1,7 @@
import { Boom } from '@hapi/boom'
import type { Logger } from 'pino'
import { proto } from '../../WAProto'
import { AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, LastMessageList, LTHashState, WAMessageUpdate, WAPatchCreate, WAPatchName } from '../Types'
import { AppStateChunk, AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, SyncActionUpdates, WAPatchCreate, WAPatchName } from '../Types'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, jidNormalizedUser } from '../WABinary'
import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto'
import { toNumber } from './generics'
@@ -115,6 +115,18 @@ const generatePatchMac = (snapshotMac: Uint8Array, valueMacs: Uint8Array[], vers
export const newLTHashState = (): LTHashState => ({ version: 0, hash: Buffer.alloc(128), indexValueMap: {} })
export const newAppStateChunk = (collectionsToHandle: readonly WAPatchName[]): AppStateChunk => ({
updates: {
chatUpdates: { },
credsUpdates: { },
chatDeletes: [],
contactUpserts: { },
msgDeletes: [],
msgUpdates: { }
},
collectionsToHandle: [...collectionsToHandle],
})
export const encodeSyncdPatch = async(
{ type, index, syncAction, apiVersion, operation }: WAPatchCreate,
myAppStateKeyId: string,
@@ -184,6 +196,7 @@ export const decodeSyncdMutations = async(
msgMutations: (proto.ISyncdMutation | proto.ISyncdRecord)[],
initialState: LTHashState,
getAppStateSyncKey: FetchAppStateSyncKey,
onMutation: (mutation: ChatMutation) => void,
validateMacs: boolean
) => {
const keyCache: { [_: string]: ReturnType<typeof mutationKeys> } = { }
@@ -205,8 +218,6 @@ export const decodeSyncdMutations = async(
}
const ltGenerator = makeLtHashGenerator(initialState)
const mutations: ChatMutation[] = []
// indexKey used to HMAC sign record.index.blob
// valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32]
// the remaining record.value.blob[0:-32] is the mac, it the HMAC sign of key.keyId + decoded proto data + length of bytes in keyId
@@ -238,10 +249,8 @@ export const decodeSyncdMutations = async(
}
const indexStr = Buffer.from(syncAction.index).toString()
mutations.push({
syncAction,
index: JSON.parse(indexStr),
})
onMutation({ syncAction, index: JSON.parse(indexStr) })
ltGenerator.mix({
indexMac: record.index!.blob!,
valueMac: ogValueMac,
@@ -249,7 +258,7 @@ export const decodeSyncdMutations = async(
})
}
return { mutations, ...ltGenerator.finish() }
return ltGenerator.finish()
}
export const decodeSyncdPatch = async(
@@ -257,6 +266,7 @@ export const decodeSyncdPatch = async(
name: WAPatchName,
initialState: LTHashState,
getAppStateSyncKey: FetchAppStateSyncKey,
onMutation: (mutation: ChatMutation) => void,
validateMacs: boolean
) => {
if(validateMacs) {
@@ -271,7 +281,7 @@ export const decodeSyncdPatch = async(
}
}
const result = await decodeSyncdMutations(msg!.mutations!, initialState, getAppStateSyncKey, validateMacs)
const result = await decodeSyncdMutations(msg!.mutations!, initialState, getAppStateSyncKey, onMutation, validateMacs)
return result
}
@@ -351,12 +361,28 @@ export const decodeSyncdSnapshot = async(
snapshot: proto.ISyncdSnapshot,
getAppStateSyncKey: FetchAppStateSyncKey,
minimumVersionNumber: number | undefined,
onMutation?: (mutation: ChatMutation) => void,
validateMacs: boolean = true
) => {
const newState = newLTHashState()
newState.version = toNumber(snapshot.version!.version!)
const { hash, indexValueMap, mutations } = await decodeSyncdMutations(snapshot.records!, newState, getAppStateSyncKey, validateMacs)
onMutation = onMutation || (() => { })
const { hash, indexValueMap } = await decodeSyncdMutations(
snapshot.records!,
newState,
getAppStateSyncKey,
mutation => {
if(onMutation) {
const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber
if(areMutationsRequired) {
onMutation(mutation)
}
}
},
validateMacs
)
newState.hash = hash
newState.indexValueMap = indexValueMap
@@ -374,15 +400,8 @@ export const decodeSyncdSnapshot = async(
}
}
const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber
if(!areMutationsRequired) {
// clear array
mutations.splice(0, mutations.length)
}
return {
state: newState,
mutations
}
}
@@ -391,9 +410,12 @@ export const decodePatches = async(
syncds: proto.ISyncdPatch[],
initial: LTHashState,
getAppStateSyncKey: FetchAppStateSyncKey,
onMutation: (mut: ChatMutation) => void,
minimumVersionNumber?: number,
logger?: Logger,
validateMacs: boolean = true
) => {
syncds = [...syncds]
const successfulMutations: ChatMutation[] = []
const newState: LTHashState = {
@@ -401,24 +423,24 @@ export const decodePatches = async(
indexValueMap: { ...initial.indexValueMap }
}
for(const syncd of syncds) {
while(syncds.length) {
const syncd = syncds[0]
const { version, keyId, snapshotMac } = syncd
if(syncd.externalMutations) {
logger?.trace({ name, version }, 'downloading external patch')
const ref = await downloadExternalPatch(syncd.externalMutations)
logger?.debug({ name, version, mutations: ref.mutations.length }, 'downloaded external patch')
syncd.mutations.push(...ref.mutations)
}
const patchVersion = toNumber(version.version!)
newState.version = patchVersion
const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs)
const shouldMutate = typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber
const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, shouldMutate ? onMutation : (() => { }), validateMacs)
newState.hash = decodeResult.hash
newState.indexValueMap = decodeResult.indexValueMap
if(typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber) {
successfulMutations.push(...decodeResult.mutations)
}
if(validateMacs) {
const base64Key = Buffer.from(keyId!.id!).toString('base64')
@@ -433,6 +455,11 @@ export const decodePatches = async(
throw new Boom(`failed to verify LTHash at ${newState.version} of ${name}`)
}
}
// clear memory used up by the mutations
syncd.mutations = []
// pop first element
syncds.splice(0, 1)
}
return {
@@ -572,83 +599,140 @@ export const chatModificationToAppPatch = (
return patch
}
export const processSyncActions = (
actions: ChatMutation[],
export const processSyncAction = (
syncAction: ChatMutation,
{ credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates,
me: Contact,
logger?: Logger
initialSyncOpts?: InitialAppStateSyncOptions,
logger?: Logger,
) => {
const isInitialSync = !!initialSyncOpts
const recvChats = initialSyncOpts?.recvChats
const accountSettings = initialSyncOpts?.accountSettings
const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction
const update: Partial<Chat> = { id }
if(action?.muteAction) {
update.mute = action.muteAction?.muted ?
toNumber(action.muteAction!.muteEndTimestamp!) :
undefined
} else if(action?.archiveChatAction) {
// okay so we've to do some annoying computation here
// when we're initially syncing the app state
// there are a few cases we need to handle
// 1. if the account unarchiveChats setting is true
// a. if the chat is archived, and no further messages have been received -- simple, keep archived
// b. if the chat was archived, and the user received messages from the other person afterwards
// then the chat should be marked unarchved --
// we compare the timestamp of latest message from the other person to determine this
// 2. if the account unarchiveChats setting is false -- then it doesn't matter,
// it'll always take an app state action to mark in unarchived -- which we'll get anyway
const archiveAction = action.archiveChatAction
if(
isValidPatchBasedOnMessageRange(id, archiveAction.messageRange)
|| !isInitialSync
|| !accountSettings.unarchiveChats
) {
// basically we don't need to fire an "archive" update if the chat is being marked unarchvied
// this only applies for the initial sync
if(isInitialSync && !archiveAction.archived) {
delete update.archive
} else {
update.archive = !!archiveAction?.archived
}
}
} else if(action?.markChatAsReadAction) {
const markReadAction = action.markChatAsReadAction
if(
isValidPatchBasedOnMessageRange(id, markReadAction.messageRange)
|| !isInitialSync
) {
// basically we don't need to fire an "read" update if the chat is being marked as read
// because the chat is read by default
// this only applies for the initial sync
if(isInitialSync && markReadAction.read) {
delete update.unreadCount
} else {
update.unreadCount = !!markReadAction?.read ? 0 : -1
}
}
} else if(action?.clearChatAction) {
msgDeletes.push({
remoteJid: id,
id: msgId,
fromMe: fromMe === '1'
})
} else if(action?.contactAction) {
contactUpserts[id] = {
...(contactUpserts[id] || {}),
id,
name: action.contactAction!.fullName
}
} else if(action?.pushNameSetting) {
if(me?.name !== action?.pushNameSetting) {
credsUpdates.me = { ...me, name: action?.pushNameSetting?.name! }
}
} else if(action?.pinAction) {
update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null
} else if(action?.unarchiveChatsSetting) {
const unarchiveChats = !!action.unarchiveChatsSetting.unarchiveChats
credsUpdates.accountSettings = { unarchiveChats }
logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`)
accountSettings.unarchiveChats = unarchiveChats
} else if(action?.starAction) {
const uqId = `${id},${msgId}`
const update = msgUpdates[uqId] || {
key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' },
update: { }
}
update.update.starred = !!action.starAction?.starred
msgUpdates[uqId] = update
} else if(action?.deleteChatAction) {
chatDeletes.push(id)
} else {
logger.warn({ syncAction, id }, 'unprocessable update')
}
if(Object.keys(update).length > 1) {
chatUpdates[update.id] = {
...(chatUpdates[update.id] || {}),
...update
}
} else if(chatUpdates[update.id]) {
// remove if the update got cancelled
logger?.debug({ id: update.id }, 'cancelling update')
delete chatUpdates[update.id]
}
function isValidPatchBasedOnMessageRange(id: string, msgRange: proto.ISyncActionMessageRange) {
const chat = recvChats?.[id]
const lastMsgTimestamp = msgRange.lastMessageTimestamp || msgRange.lastSystemMessageTimestamp || 0
const chatLastMsgTimestamp = chat?.lastMsgRecvTimestamp || 0
return lastMsgTimestamp >= chatLastMsgTimestamp
}
}
export const syncActionUpdatesToEventMap = (
{ credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates,
) => {
const map: Partial<BaileysEventMap<AuthenticationCreds>> = { }
const updates: { [jid: string]: Partial<Chat> } = {}
const contactUpdates: { [jid: string]: Contact } = {}
const msgDeletes: proto.IMessageKey[] = []
const msgUpdates: { [_: string]: WAMessageUpdate } = { }
for(const syncAction of actions) {
const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction
const update: Partial<Chat> = { id }
if(action?.muteAction) {
update.mute = action.muteAction?.muted ?
toNumber(action.muteAction!.muteEndTimestamp!) :
undefined
} else if(action?.archiveChatAction) {
update.archive = !!action.archiveChatAction?.archived
} else if(action?.markChatAsReadAction) {
update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1
} else if(action?.clearChatAction) {
msgDeletes.push({
remoteJid: id,
id: msgId,
fromMe: fromMe === '1'
})
} else if(action?.contactAction) {
contactUpdates[id] = {
...(contactUpdates[id] || {}),
id,
name: action.contactAction!.fullName
}
} else if(action?.pushNameSetting) {
if(me?.name !== action?.pushNameSetting) {
map['creds.update'] = map['creds.update'] || { }
map['creds.update'].me = { ...me, name: action?.pushNameSetting?.name! }
}
} else if(action?.pinAction) {
update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null
} else if(action?.unarchiveChatsSetting) {
map['creds.update'] = map['creds.update'] || { }
map['creds.update'].accountSettings = { unarchiveChats: !!action.unarchiveChatsSetting.unarchiveChats }
logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`)
} else if(action?.starAction) {
const uqId = `${id},${msgId}`
const update = msgUpdates[uqId] || {
key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' },
update: { }
}
update.update.starred = !!action.starAction?.starred
msgUpdates[uqId] = update
} else if(action?.deleteChatAction) {
map['chats.delete'] = map['chats.delete'] || []
map['chats.delete'].push(id)
} else {
logger.warn({ syncAction, id }, 'unprocessable update')
}
if(Object.keys(update).length > 1) {
updates[update.id] = {
...(updates[update.id] || {}),
...update
}
}
if(Object.keys(credsUpdates).length) {
map['creds.update'] = credsUpdates
}
if(Object.values(updates).length) {
map['chats.update'] = Object.values(updates)
if(Object.values(chatUpdates).length) {
map['chats.update'] = Object.values(chatUpdates)
}
if(Object.values(contactUpdates).length) {
map['contacts.upsert'] = Object.values(contactUpdates)
if(chatDeletes.length) {
map['chats.delete'] = chatDeletes
}
if(Object.values(contactUpserts).length) {
map['contacts.upsert'] = Object.values(contactUpserts)
}
if(msgDeletes.length) {