diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 6b55ec5..1866c32 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -143,7 +143,7 @@ export const makeChatsSocket = (config: SocketConfig) => { await query({ tag: 'iq', attrs: { - xmlns: 'blocklist', + xmlns: 'blocklist', to: S_WHATSAPP_NET, type: 'set' }, @@ -225,9 +225,12 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false) => { - const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []} - + const resyncAppState = async(collections: WAPatchName[]) => { + const appStateChunk: AppStateChunk = {totalMutations: [], collectionsToHandle: []} + // we use this to determine which events to fire + // otherwise when we resync from scratch -- all notifications will fire + const initialVersionMap: { [T in WAPatchName]?: number } = { } + await authState.keys.transaction( async() => { const collectionsToHandle = new Set(collections) @@ -241,12 +244,16 @@ export const makeChatsSocket = (config: SocketConfig) => { const nodes: BinaryNode[] = [] for(const name of collectionsToHandle) { - let state: LTHashState - if(!fromScratch) { - const result = await authState.keys.get('app-state-sync-version', [name]) - state = result[name] + const result = await authState.keys.get('app-state-sync-version', [name]) + let state = result[name] + + if(state) { + if(typeof initialVersionMap[name] === 'undefined') { + initialVersionMap[name] = state.version + } + } else { + state = newLTHashState() } - if(!state) state = newLTHashState() states[name] = state @@ -285,16 +292,18 @@ export const makeChatsSocket = (config: SocketConfig) => { const { patches, hasMorePatches, snapshot } = decoded[name] try { if(snapshot) { - const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) + const { state: newState, mutations } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name]) states[name] = newState - logger.info(`restored state of ${name} from snapshot to v${newState.version}`) + logger.info(`restored state of ${name} from snapshot to v${newState.version} with ${mutations.length} mutations`) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) + + appStateChunk.totalMutations.push(...mutations) } // only process if there are syncd patches if(patches.length) { - const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) + const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, initialVersionMap[name]) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) @@ -313,9 +322,12 @@ export const makeChatsSocket = (config: SocketConfig) => { } catch(error) { logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch') await authState.keys.set({ "app-state-sync-version": { [name]: null } }) - + // increment number of retries attemptsMap[name] = (attemptsMap[name] || 0) + 1 - if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS) { + // if retry attempts overshoot + // or key not found + if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404) { + // stop retrying collectionsToHandle.delete(name) } } @@ -431,6 +443,8 @@ export const makeChatsSocket = (config: SocketConfig) => { } const processSyncActions = (actions: ChatMutation[]) => { + console.log(actions) + const updates: { [jid: string]: Partial } = {} const contactUpdates: { [jid: string]: Contact } = {} const msgDeletes: proto.IMessageKey[] = [] @@ -637,7 +651,7 @@ export const makeChatsSocket = (config: SocketConfig) => { const name = update.attrs.name as WAPatchName mutationMutex.mutex( async() => { - await resyncAppState([name], false) + await resyncAppState([name]) .catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) } ) diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index 994821e..06b7e50 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -10,7 +10,10 @@ export interface PresenceData { lastSeen?: number } -export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number } +export type ChatMutation = { + syncAction: proto.ISyncActionData + index: string[] +} export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] } diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 0b25d5a..fd2fc10 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -187,7 +187,7 @@ export const decodeSyncdMutations = async( if(!key) { const keyEnc = await getAppStateSyncKey(base64Key) if(!keyEnc) { - throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500, data: { msgMutations } }) + throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } }) } const result = mutationKeys(keyEnc.keyData!) keyCache[base64Key] = result @@ -229,17 +229,16 @@ export const decodeSyncdMutations = async( } } - const indexStr = Buffer.from(syncAction.index).toString() - const mutation: ChatMutation = { + const indexStr = Buffer.from(syncAction.index).toString() + mutations.push({ syncAction, index: JSON.parse(indexStr), + }) + ltGenerator.mix({ indexMac: record.index!.blob!, valueMac: ogValueMac, - operation: operation, - } - mutations.push(mutation) - - ltGenerator.mix(mutation) + operation: operation + }) } return { mutations, ...ltGenerator.finish() } @@ -339,20 +338,15 @@ export const decodeSyncdSnapshot = async( name: WAPatchName, snapshot: proto.ISyncdSnapshot, getAppStateSyncKey: FetchAppStateSyncKey, + minimumVersionNumber: number | undefined, validateMacs: boolean = true ) => { const newState = newLTHashState() newState.version = toNumber(snapshot.version!.version!) - const records = snapshot.records! - - const ltGenerator = makeLtHashGenerator(newState) - for(const { index, value } of records) { - const valueMac = value.blob!.slice(-32)! - ltGenerator.mix({ indexMac: index.blob!, valueMac, operation: 0 }) - } - - Object.assign(newState, ltGenerator.finish()) + let { hash, indexValueMap, mutations } = await decodeSyncdMutations(snapshot.records!, newState, getAppStateSyncKey, validateMacs) + newState.hash = hash + newState.indexValueMap = indexValueMap if(validateMacs) { const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64') @@ -367,7 +361,15 @@ export const decodeSyncdSnapshot = async( } } - return newState + const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber + if(!areMutationsRequired) { + mutations = [] + } + + return { + state: newState, + mutations + } } export const decodePatches = async( @@ -375,6 +377,7 @@ export const decodePatches = async( syncds: proto.ISyncdPatch[], initial: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, + minimumVersionNumber?: number, validateMacs: boolean = true ) => { const successfulMutations: ChatMutation[] = [] @@ -390,14 +393,18 @@ export const decodePatches = async( const ref = await downloadExternalPatch(syncd.externalMutations) syncd.mutations.push(...ref.mutations) } + + const patchVersion = toNumber(version.version!) - newState.version = toNumber(version.version!) + newState.version = patchVersion const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs) newState.hash = decodeResult.hash newState.indexValueMap = decodeResult.indexValueMap - successfulMutations.push(...decodeResult.mutations) + if(typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber) { + successfulMutations.push(...decodeResult.mutations) + } if(validateMacs) { const base64Key = Buffer.from(keyId!.id!).toString('base64')