diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index c811dad..f7e1892 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' -import { ALL_WA_PATCH_NAMES, AppStateChunk, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' -import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils' +import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' +import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils' import { makeMutex } from '../Utils/make-mutex' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { makeMessagesSocket } from './messages-send' @@ -228,8 +228,24 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const resyncAppState = async(collections: readonly WAPatchName[], ctx: InitialReceivedChatsState | undefined) => { - const appStateChunk: AppStateChunk = { totalMutations: [], collectionsToHandle: [] } + const newAppStateChunkHandler = (collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { + const appStateChunk = newAppStateChunk(collections) + return { + appStateChunk, + onMutation(mutation: ChatMutation) { + processSyncAction( + mutation, + appStateChunk.updates, + authState.creds.me, + recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined, + logger + ) + } + } + } + + const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { + const { appStateChunk, onMutation } = newAppStateChunkHandler(collections, recvChats) // we use this to determine which events to fire // otherwise when we resync from scratch -- all notifications will fire const initialVersionMap: { [T in WAPatchName]?: number } = { } @@ -295,22 +311,19 @@ export const makeChatsSocket = (config: SocketConfig) => { const { patches, hasMorePatches, snapshot } = decoded[name] try { if(snapshot) { - const { state: newState, mutations } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name]) + const { state: newState } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name], onMutation) states[name] = newState logger.info( - { mutations: logger.level === 'trace' ? mutations : undefined }, - `restored state of ${name} from snapshot to v${newState.version} with ${mutations.length} mutations` + `restored state of ${name} from snapshot to v${newState.version} with mutations` ) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) - - appStateChunk.totalMutations.push(...mutations) } // only process if there are syncd patches if(patches.length) { - const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, initialVersionMap[name]) + const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, onMutation, initialVersionMap[name]) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) @@ -318,8 +331,6 @@ export const makeChatsSocket = (config: SocketConfig) => { if(newMutations.length) { logger.trace({ newMutations, name }, 'recv new mutations') } - - appStateChunk.totalMutations.push(...newMutations) } if(hasMorePatches) { @@ -328,13 +339,15 @@ export const makeChatsSocket = (config: SocketConfig) => { collectionsToHandle.delete(name) } } catch(error) { - logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch') + // if retry attempts overshoot + // or key not found + const isIrrecoverableError = attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404 + logger.info({ name, error: error.stack }, `failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}`) await authState.keys.set({ 'app-state-sync-version': { [name]: null } }) // increment number of retries attemptsMap[name] = (attemptsMap[name] || 0) + 1 - // if retry attempts overshoot - // or key not found - if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404) { + + if(isIrrecoverableError) { // stop retrying collectionsToHandle.delete(name) } @@ -344,7 +357,7 @@ export const makeChatsSocket = (config: SocketConfig) => { } ) - processSyncActionsLocal(appStateChunk.totalMutations, ctx) + processSyncActionsLocal(appStateChunk.updates) return appStateChunk } @@ -459,16 +472,10 @@ export const makeChatsSocket = (config: SocketConfig) => { ) } - const processSyncActionsLocal = (actions: ChatMutation[], recvChats: InitialReceivedChatsState | undefined) => { - const events = processSyncActions( - actions, - authState.creds.me!, - recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined, - logger - ) - emitEventsFromMap(events) + const processSyncActionsLocal = (actions: SyncActionUpdates) => { + emitEventsFromMap(syncActionUpdatesToEventMap(actions)) // resend available presence to update name on servers - if(events['creds.update']?.me?.name && markOnlineOnConnect) { + if(actions.credsUpdates.me?.name && markOnlineOnConnect) { sendPresenceUpdate('available') } } @@ -542,8 +549,17 @@ export const makeChatsSocket = (config: SocketConfig) => { ) if(config.emitOwnEvents) { - const result = await decodePatches(name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], initial, getAppStateSyncKey) - processSyncActionsLocal(result.newMutations, undefined) + const { appStateChunk, onMutation } = newAppStateChunkHandler([name], undefined) + await decodePatches( + name, + [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], + initial, + getAppStateSyncKey, + onMutation, + undefined, + logger, + ) + processSyncActionsLocal(appStateChunk.updates) } } diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index 3381477..f8e55f4 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -1,6 +1,7 @@ import type { proto } from '../../WAProto' -import type { AccountSettings } from './Auth' -import type { MinimalMessage } from './Message' +import type { AccountSettings, AuthenticationCreds } from './Auth' +import { Contact } from './Contact' +import type { MinimalMessage, WAMessageUpdate } from './Message' /** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */ export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused' @@ -25,7 +26,19 @@ export type ChatMutation = { index: string[] } -export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] } +export type SyncActionUpdates = { + credsUpdates: Partial + chatUpdates: { [jid: string]: Partial } + chatDeletes: string[] + contactUpserts: { [jid: string]: Contact } + msgUpdates: { [jid: string]: WAMessageUpdate } + msgDeletes: proto.IMessageKey[] +} + +export type AppStateChunk = { + updates: SyncActionUpdates + collectionsToHandle: WAPatchName[] +} export type WAPatchCreate = { syncAction: proto.ISyncActionValue @@ -76,7 +89,6 @@ export type ChatModification = } | { delete: true, lastMessages: LastMessageList } - export type InitialReceivedChatsState = { [jid: string]: { lastMsgRecvTimestamp: number } } diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 6470a71..7a4ea1e 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, LastMessageList, LTHashState, WAMessageUpdate, WAPatchCreate, WAPatchName } from '../Types' +import { AppStateChunk, AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, SyncActionUpdates, WAPatchCreate, WAPatchName } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, jidNormalizedUser } from '../WABinary' import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto' import { toNumber } from './generics' @@ -115,6 +115,18 @@ const generatePatchMac = (snapshotMac: Uint8Array, valueMacs: Uint8Array[], vers export const newLTHashState = (): LTHashState => ({ version: 0, hash: Buffer.alloc(128), indexValueMap: {} }) +export const newAppStateChunk = (collectionsToHandle: readonly WAPatchName[]): AppStateChunk => ({ + updates: { + chatUpdates: { }, + credsUpdates: { }, + chatDeletes: [], + contactUpserts: { }, + msgDeletes: [], + msgUpdates: { } + }, + collectionsToHandle: [...collectionsToHandle], +}) + export const encodeSyncdPatch = async( { type, index, syncAction, apiVersion, operation }: WAPatchCreate, myAppStateKeyId: string, @@ -184,6 +196,7 @@ export const decodeSyncdMutations = async( msgMutations: (proto.ISyncdMutation | proto.ISyncdRecord)[], initialState: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, + onMutation: (mutation: ChatMutation) => void, validateMacs: boolean ) => { const keyCache: { [_: string]: ReturnType } = { } @@ -205,8 +218,6 @@ export const decodeSyncdMutations = async( } const ltGenerator = makeLtHashGenerator(initialState) - - const mutations: ChatMutation[] = [] // indexKey used to HMAC sign record.index.blob // valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32] // the remaining record.value.blob[0:-32] is the mac, it the HMAC sign of key.keyId + decoded proto data + length of bytes in keyId @@ -238,10 +249,8 @@ export const decodeSyncdMutations = async( } const indexStr = Buffer.from(syncAction.index).toString() - mutations.push({ - syncAction, - index: JSON.parse(indexStr), - }) + onMutation({ syncAction, index: JSON.parse(indexStr) }) + ltGenerator.mix({ indexMac: record.index!.blob!, valueMac: ogValueMac, @@ -249,7 +258,7 @@ export const decodeSyncdMutations = async( }) } - return { mutations, ...ltGenerator.finish() } + return ltGenerator.finish() } export const decodeSyncdPatch = async( @@ -257,6 +266,7 @@ export const decodeSyncdPatch = async( name: WAPatchName, initialState: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, + onMutation: (mutation: ChatMutation) => void, validateMacs: boolean ) => { if(validateMacs) { @@ -271,7 +281,7 @@ export const decodeSyncdPatch = async( } } - const result = await decodeSyncdMutations(msg!.mutations!, initialState, getAppStateSyncKey, validateMacs) + const result = await decodeSyncdMutations(msg!.mutations!, initialState, getAppStateSyncKey, onMutation, validateMacs) return result } @@ -351,12 +361,28 @@ export const decodeSyncdSnapshot = async( snapshot: proto.ISyncdSnapshot, getAppStateSyncKey: FetchAppStateSyncKey, minimumVersionNumber: number | undefined, + onMutation?: (mutation: ChatMutation) => void, validateMacs: boolean = true ) => { const newState = newLTHashState() newState.version = toNumber(snapshot.version!.version!) - const { hash, indexValueMap, mutations } = await decodeSyncdMutations(snapshot.records!, newState, getAppStateSyncKey, validateMacs) + onMutation = onMutation || (() => { }) + + const { hash, indexValueMap } = await decodeSyncdMutations( + snapshot.records!, + newState, + getAppStateSyncKey, + mutation => { + if(onMutation) { + const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber + if(areMutationsRequired) { + onMutation(mutation) + } + } + }, + validateMacs + ) newState.hash = hash newState.indexValueMap = indexValueMap @@ -374,15 +400,8 @@ export const decodeSyncdSnapshot = async( } } - const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber - if(!areMutationsRequired) { - // clear array - mutations.splice(0, mutations.length) - } - return { state: newState, - mutations } } @@ -391,9 +410,12 @@ export const decodePatches = async( syncds: proto.ISyncdPatch[], initial: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, + onMutation: (mut: ChatMutation) => void, minimumVersionNumber?: number, + logger?: Logger, validateMacs: boolean = true ) => { + syncds = [...syncds] const successfulMutations: ChatMutation[] = [] const newState: LTHashState = { @@ -401,24 +423,24 @@ export const decodePatches = async( indexValueMap: { ...initial.indexValueMap } } - for(const syncd of syncds) { + while(syncds.length) { + const syncd = syncds[0] const { version, keyId, snapshotMac } = syncd if(syncd.externalMutations) { + logger?.trace({ name, version }, 'downloading external patch') const ref = await downloadExternalPatch(syncd.externalMutations) + logger?.debug({ name, version, mutations: ref.mutations.length }, 'downloaded external patch') syncd.mutations.push(...ref.mutations) } const patchVersion = toNumber(version.version!) newState.version = patchVersion - - const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs) + const shouldMutate = typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber + const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, shouldMutate ? onMutation : (() => { }), validateMacs) newState.hash = decodeResult.hash newState.indexValueMap = decodeResult.indexValueMap - if(typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber) { - successfulMutations.push(...decodeResult.mutations) - } if(validateMacs) { const base64Key = Buffer.from(keyId!.id!).toString('base64') @@ -433,6 +455,11 @@ export const decodePatches = async( throw new Boom(`failed to verify LTHash at ${newState.version} of ${name}`) } } + + // clear memory used up by the mutations + syncd.mutations = [] + // pop first element + syncds.splice(0, 1) } return { @@ -572,83 +599,140 @@ export const chatModificationToAppPatch = ( return patch } -export const processSyncActions = ( - actions: ChatMutation[], +export const processSyncAction = ( + syncAction: ChatMutation, + { credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates, me: Contact, - logger?: Logger + initialSyncOpts?: InitialAppStateSyncOptions, + logger?: Logger, +) => { + const isInitialSync = !!initialSyncOpts + const recvChats = initialSyncOpts?.recvChats + const accountSettings = initialSyncOpts?.accountSettings + + const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction + const update: Partial = { id } + if(action?.muteAction) { + update.mute = action.muteAction?.muted ? + toNumber(action.muteAction!.muteEndTimestamp!) : + undefined + } else if(action?.archiveChatAction) { + // okay so we've to do some annoying computation here + // when we're initially syncing the app state + // there are a few cases we need to handle + // 1. if the account unarchiveChats setting is true + // a. if the chat is archived, and no further messages have been received -- simple, keep archived + // b. if the chat was archived, and the user received messages from the other person afterwards + // then the chat should be marked unarchved -- + // we compare the timestamp of latest message from the other person to determine this + // 2. if the account unarchiveChats setting is false -- then it doesn't matter, + // it'll always take an app state action to mark in unarchived -- which we'll get anyway + const archiveAction = action.archiveChatAction + if( + isValidPatchBasedOnMessageRange(id, archiveAction.messageRange) + || !isInitialSync + || !accountSettings.unarchiveChats + ) { + // basically we don't need to fire an "archive" update if the chat is being marked unarchvied + // this only applies for the initial sync + if(isInitialSync && !archiveAction.archived) { + delete update.archive + } else { + update.archive = !!archiveAction?.archived + } + } + } else if(action?.markChatAsReadAction) { + const markReadAction = action.markChatAsReadAction + if( + isValidPatchBasedOnMessageRange(id, markReadAction.messageRange) + || !isInitialSync + ) { + // basically we don't need to fire an "read" update if the chat is being marked as read + // because the chat is read by default + // this only applies for the initial sync + if(isInitialSync && markReadAction.read) { + delete update.unreadCount + } else { + update.unreadCount = !!markReadAction?.read ? 0 : -1 + } + } + } else if(action?.clearChatAction) { + msgDeletes.push({ + remoteJid: id, + id: msgId, + fromMe: fromMe === '1' + }) + } else if(action?.contactAction) { + contactUpserts[id] = { + ...(contactUpserts[id] || {}), + id, + name: action.contactAction!.fullName + } + } else if(action?.pushNameSetting) { + if(me?.name !== action?.pushNameSetting) { + credsUpdates.me = { ...me, name: action?.pushNameSetting?.name! } + } + } else if(action?.pinAction) { + update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null + } else if(action?.unarchiveChatsSetting) { + const unarchiveChats = !!action.unarchiveChatsSetting.unarchiveChats + credsUpdates.accountSettings = { unarchiveChats } + + logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`) + accountSettings.unarchiveChats = unarchiveChats + } else if(action?.starAction) { + const uqId = `${id},${msgId}` + const update = msgUpdates[uqId] || { + key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' }, + update: { } + } + + update.update.starred = !!action.starAction?.starred + + msgUpdates[uqId] = update + } else if(action?.deleteChatAction) { + chatDeletes.push(id) + } else { + logger.warn({ syncAction, id }, 'unprocessable update') + } + + if(Object.keys(update).length > 1) { + chatUpdates[update.id] = { + ...(chatUpdates[update.id] || {}), + ...update + } + } else if(chatUpdates[update.id]) { + // remove if the update got cancelled + logger?.debug({ id: update.id }, 'cancelling update') + delete chatUpdates[update.id] + } + + function isValidPatchBasedOnMessageRange(id: string, msgRange: proto.ISyncActionMessageRange) { + const chat = recvChats?.[id] + const lastMsgTimestamp = msgRange.lastMessageTimestamp || msgRange.lastSystemMessageTimestamp || 0 + const chatLastMsgTimestamp = chat?.lastMsgRecvTimestamp || 0 + return lastMsgTimestamp >= chatLastMsgTimestamp + } +} + +export const syncActionUpdatesToEventMap = ( + { credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates, ) => { const map: Partial> = { } - const updates: { [jid: string]: Partial } = {} - const contactUpdates: { [jid: string]: Contact } = {} - const msgDeletes: proto.IMessageKey[] = [] - const msgUpdates: { [_: string]: WAMessageUpdate } = { } - - for(const syncAction of actions) { - const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction - const update: Partial = { id } - if(action?.muteAction) { - update.mute = action.muteAction?.muted ? - toNumber(action.muteAction!.muteEndTimestamp!) : - undefined - } else if(action?.archiveChatAction) { - update.archive = !!action.archiveChatAction?.archived - } else if(action?.markChatAsReadAction) { - update.unreadCount = !!action.markChatAsReadAction?.read ? 0 : -1 - } else if(action?.clearChatAction) { - msgDeletes.push({ - remoteJid: id, - id: msgId, - fromMe: fromMe === '1' - }) - } else if(action?.contactAction) { - contactUpdates[id] = { - ...(contactUpdates[id] || {}), - id, - name: action.contactAction!.fullName - } - } else if(action?.pushNameSetting) { - if(me?.name !== action?.pushNameSetting) { - map['creds.update'] = map['creds.update'] || { } - map['creds.update'].me = { ...me, name: action?.pushNameSetting?.name! } - } - } else if(action?.pinAction) { - update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null - } else if(action?.unarchiveChatsSetting) { - map['creds.update'] = map['creds.update'] || { } - map['creds.update'].accountSettings = { unarchiveChats: !!action.unarchiveChatsSetting.unarchiveChats } - - logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`) - } else if(action?.starAction) { - const uqId = `${id},${msgId}` - const update = msgUpdates[uqId] || { - key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' }, - update: { } - } - - update.update.starred = !!action.starAction?.starred - - msgUpdates[uqId] = update - } else if(action?.deleteChatAction) { - map['chats.delete'] = map['chats.delete'] || [] - map['chats.delete'].push(id) - } else { - logger.warn({ syncAction, id }, 'unprocessable update') - } - - if(Object.keys(update).length > 1) { - updates[update.id] = { - ...(updates[update.id] || {}), - ...update - } - } + if(Object.keys(credsUpdates).length) { + map['creds.update'] = credsUpdates } - if(Object.values(updates).length) { - map['chats.update'] = Object.values(updates) + if(Object.values(chatUpdates).length) { + map['chats.update'] = Object.values(chatUpdates) } - if(Object.values(contactUpdates).length) { - map['contacts.upsert'] = Object.values(contactUpdates) + if(chatDeletes.length) { + map['chats.delete'] = chatDeletes + } + + if(Object.values(contactUpserts).length) { + map['contacts.upsert'] = Object.values(contactUpserts) } if(msgDeletes.length) {