diff --git a/package.json b/package.json index 967ec56..6cde14e 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "dependencies": { "@hapi/boom": "^9.1.3", "axios": "^0.24.0", - "futoin-hkdf": "^1.5.0", + "futoin-hkdf": "^1.5.1", "libsignal": "git+https://github.com/adiwajshing/libsignal-node", "music-metadata": "^7.12.3", "node-cache": "^5.1.2", diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index da6449f..84b7ce9 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -2,13 +2,13 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' import { PROCESSABLE_HISTORY_TYPES } from '../Defaults' import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' -import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils' +import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils' import { makeMutex } from '../Utils/make-mutex' import processMessage from '../Utils/process-message' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { makeSocket } from './socket' -const MAX_SYNC_ATTEMPTS = 5 +const MAX_SYNC_ATTEMPTS = 2 export const makeChatsSocket = (config: SocketConfig) => { const { logger, markOnlineOnConnect, shouldSyncHistoryMessage, fireInitQueries } = config @@ -293,16 +293,16 @@ export const makeChatsSocket = (config: SocketConfig) => { } const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], isInitialSync: boolean) => { - const { onMutation } = newAppStateChunkHandler(isInitialSync) // we use this to determine which events to fire // otherwise when we resync from scratch -- all notifications will fire const initialVersionMap: { [T in WAPatchName]?: number } = { } + const globalMutationMap: ChatMutationMap = { } await authState.keys.transaction( async() => { const collectionsToHandle = new Set(collections) // in case something goes wrong -- ensure we don't enter a loop that cannot be exited from - const attemptsMap = { } as { [T in WAPatchName]: number | undefined } + const attemptsMap: { [T in WAPatchName]?: number } = { } // keep executing till all collections are done // sometimes a single patch request will not return all the patches (God knows why) // so we fetch till they're all done (this is determined by the "has_more_patches" flag) @@ -353,30 +353,29 @@ export const makeChatsSocket = (config: SocketConfig) => { ] }) - const decoded = await extractSyncdPatches(result, config?.options) // extract from binary node + // extract from binary node + const decoded = await extractSyncdPatches(result, config?.options) for(const key in decoded) { const name = key as WAPatchName const { patches, hasMorePatches, snapshot } = decoded[name] try { if(snapshot) { - const { state: newState } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name], onMutation) + const { state: newState, mutationMap } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name]) states[name] = newState + Object.assign(globalMutationMap, mutationMap) - logger.info( - `restored state of ${name} from snapshot to v${newState.version} with mutations` - ) + logger.info(`restored state of ${name} from snapshot to v${newState.version} with mutations`) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) } // only process if there are syncd patches if(patches.length) { - const { state: newState } = await decodePatches( + const { state: newState, mutationMap } = await decodePatches( name, patches, states[name], getAppStateSyncKey, - onMutation, config.options, initialVersionMap[name] ) @@ -384,6 +383,9 @@ export const makeChatsSocket = (config: SocketConfig) => { await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) logger.info(`synced ${name} to v${newState.version}`) + initialVersionMap[name] = newState.version + + Object.assign(globalMutationMap, mutationMap) } if(hasMorePatches) { @@ -396,8 +398,11 @@ export const makeChatsSocket = (config: SocketConfig) => { // or key not found const isIrrecoverableError = attemptsMap[name]! >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404 - || error.message.includes('TypeError') - logger.info({ name, error: error.stack }, `failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}`) + || error.name === 'TypeError' + logger.info( + { name, error: error.stack }, + `failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}` + ) await authState.keys.set({ 'app-state-sync-version': { [name]: null } }) // increment number of retries attemptsMap[name] = (attemptsMap[name] || 0) + 1 @@ -411,6 +416,11 @@ export const makeChatsSocket = (config: SocketConfig) => { } } ) + + const { onMutation } = newAppStateChunkHandler(isInitialSync) + for(const key in globalMutationMap) { + onMutation(globalMutationMap[key]) + } }) /** @@ -580,16 +590,18 @@ export const makeChatsSocket = (config: SocketConfig) => { if(config.emitOwnEvents) { const { onMutation } = newAppStateChunkHandler(false) - await decodePatches( + const { mutationMap } = await decodePatches( name, [{ ...encodeResult!.patch, version: { version: encodeResult!.state.version }, }], initial!, getAppStateSyncKey, - onMutation, config.options, undefined, logger, ) + for(const key in mutationMap) { + onMutation(mutationMap[key]) + } } } diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index d7e566c..31bda6f 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -11,6 +11,8 @@ import { downloadContentFromMessage, } from './messages-media' type FetchAppStateSyncKey = (keyId: string) => Promise +export type ChatMutationMap = { [index: string]: ChatMutation } + const mutationKeys = (keydata: Uint8Array) => { const expanded = hkdf(keydata, 160, { info: 'WhatsApp Mutation Keys' }) return { @@ -50,9 +52,9 @@ const generateMac = (operation: proto.SyncdMutation.SyncdOperation, data: Buffer } const to64BitNetworkOrder = (e: number) => { - const t = new ArrayBuffer(8) - new DataView(t).setUint32(4, e, !1) - return Buffer.from(t) + const buff = Buffer.alloc(8) + buff.writeUint32BE(e, 4) + return buff } type Mac = { indexMac: Uint8Array, valueMac: Uint8Array, operation: proto.SyncdMutation.SyncdOperation } @@ -84,7 +86,8 @@ const makeLtHashGenerator = ({ indexValueMap, hash }: Pick { - const result = LT_HASH_ANTI_TAMPERING.subtractThenAdd(new Uint8Array(hash).buffer, addBuffs, subBuffs) + const hashArrayBuffer = new Uint8Array(hash).buffer + const result = LT_HASH_ANTI_TAMPERING.subtractThenAdd(hashArrayBuffer, addBuffs, subBuffs) const buffer = Buffer.from(result) return { @@ -188,24 +191,6 @@ export const decodeSyncdMutations = async( onMutation: (mutation: ChatMutation) => void, validateMacs: boolean ) => { - const keyCache: { [_: string]: ReturnType } = { } - const getKey = async(keyId: Uint8Array) => { - const base64Key = Buffer.from(keyId!).toString('base64') - let key = keyCache[base64Key] - if(!key) { - const keyEnc = await getAppStateSyncKey(base64Key) - if(!keyEnc) { - throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } }) - } - - const result = mutationKeys(keyEnc.keyData!) - keyCache[base64Key] = result - key = result - } - - return key - } - const ltGenerator = makeLtHashGenerator(initialState) // indexKey used to HMAC sign record.index.blob // valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32] @@ -248,6 +233,16 @@ export const decodeSyncdMutations = async( } return ltGenerator.finish() + + async function getKey(keyId: Uint8Array) { + const base64Key = Buffer.from(keyId!).toString('base64') + const keyEnc = await getAppStateSyncKey(base64Key) + if(!keyEnc) { + throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } }) + } + + return mutationKeys(keyEnc.keyData!) + } } export const decodeSyncdPatch = async( @@ -363,26 +358,25 @@ export const decodeSyncdSnapshot = async( snapshot: proto.ISyncdSnapshot, getAppStateSyncKey: FetchAppStateSyncKey, minimumVersionNumber: number | undefined, - onMutation?: (mutation: ChatMutation) => void, validateMacs: boolean = true ) => { const newState = newLTHashState() newState.version = toNumber(snapshot.version!.version!) - onMutation = onMutation || (() => { }) + const mutationMap: ChatMutationMap = {} + const areMutationsRequired = typeof minimumVersionNumber === 'undefined' + || newState.version > minimumVersionNumber const { hash, indexValueMap } = await decodeSyncdMutations( snapshot.records!, newState, getAppStateSyncKey, - mutation => { - if(onMutation) { - const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber - if(areMutationsRequired) { - onMutation(mutation) - } + areMutationsRequired + ? (mutation) => { + const index = mutation.syncAction.index?.toString() + mutationMap[index!] = mutation } - }, + : () => { }, validateMacs ) newState.hash = hash @@ -392,18 +386,19 @@ export const decodeSyncdSnapshot = async( const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64') const keyEnc = await getAppStateSyncKey(base64Key) if(!keyEnc) { - throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500 }) + throw new Boom(`failed to find key "${base64Key}" to decode mutation`) } const result = mutationKeys(keyEnc.keyData!) const computedSnapshotMac = generateSnapshotMac(newState.hash, newState.version, name, result.snapshotMacKey) if(Buffer.compare(snapshot.mac!, computedSnapshotMac) !== 0) { - throw new Boom(`failed to verify LTHash at ${newState.version} of ${name} from snapshot`, { statusCode: 500 }) + throw new Boom(`failed to verify LTHash at ${newState.version} of ${name} from snapshot`) } } return { state: newState, + mutationMap } } @@ -412,21 +407,20 @@ export const decodePatches = async( syncds: proto.ISyncdPatch[], initial: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, - onMutation: (mut: ChatMutation) => void, options: AxiosRequestConfig, minimumVersionNumber?: number, logger?: Logger, validateMacs: boolean = true ) => { - syncds = [...syncds] - const newState: LTHashState = { ...initial, indexValueMap: { ...initial.indexValueMap } } - while(syncds.length) { - const syncd = syncds[0] + const mutationMap: ChatMutationMap = { } + + for(let i = 0;i < syncds.length;i++) { + const syncd = syncds[i] const { version, keyId, snapshotMac } = syncd if(syncd.externalMutations) { logger?.trace({ name, version }, 'downloading external patch') @@ -439,7 +433,20 @@ export const decodePatches = async( newState.version = patchVersion const shouldMutate = typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber - const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, shouldMutate ? onMutation : (() => { }), validateMacs) + + const decodeResult = await decodeSyncdPatch( + syncd, + name, + newState, + getAppStateSyncKey, + shouldMutate + ? mutation => { + const index = mutation.syncAction.index?.toString() + mutationMap[index!] = mutation + } + : (() => { }), + validateMacs + ) newState.hash = decodeResult.hash newState.indexValueMap = decodeResult.indexValueMap @@ -460,11 +467,9 @@ export const decodePatches = async( // clear memory used up by the mutations syncd.mutations = [] - // pop first element - syncds.splice(0, 1) } - return { state: newState } + return { state: newState, mutationMap } } export const chatModificationToAppPatch = ( diff --git a/yarn.lock b/yarn.lock index bc011f4..81821c4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2448,7 +2448,7 @@ functions-have-names@^1.2.2: resolved "https://registry.yarnpkg.com/functions-have-names/-/functions-have-names-1.2.3.tgz#0404fe4ee2ba2f607f0e0ec3c80bae994133b834" integrity sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ== -futoin-hkdf@^1.5.0: +futoin-hkdf@^1.5.1: version "1.5.1" resolved "https://registry.yarnpkg.com/futoin-hkdf/-/futoin-hkdf-1.5.1.tgz#141f00427bc9950b38a42aa786b99c318b9b688d" integrity sha512-g5d0Qp7ks55hYmYmfqn4Nz18XH49lcCR+vvIvHT92xXnsJaGZmY1EtWQWilJ6BQp57heCIXM/rRo+AFep8hGgg==