From d15bde5d17d61e2872d96321768464693de58c7e Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Fri, 7 Jan 2022 14:29:52 +0530 Subject: [PATCH] refactor: app state handling 1. fixes snapshot patches not being included 2. fixes all mutations being passed when syncing from scratch 3. simpler chat mutation model 4. do not retry if key is not found --- src/Socket/chats.ts | 44 +++++++++++++++++++++++++------------- src/Types/Chat.ts | 5 ++++- src/Utils/chat-utils.ts | 47 +++++++++++++++++++++++------------------ 3 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 6b55ec5..1866c32 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -143,7 +143,7 @@ export const makeChatsSocket = (config: SocketConfig) => { await query({ tag: 'iq', attrs: { - xmlns: 'blocklist', + xmlns: 'blocklist', to: S_WHATSAPP_NET, type: 'set' }, @@ -225,9 +225,12 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false) => { - const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []} - + const resyncAppState = async(collections: WAPatchName[]) => { + const appStateChunk: AppStateChunk = {totalMutations: [], collectionsToHandle: []} + // we use this to determine which events to fire + // otherwise when we resync from scratch -- all notifications will fire + const initialVersionMap: { [T in WAPatchName]?: number } = { } + await authState.keys.transaction( async() => { const collectionsToHandle = new Set(collections) @@ -241,12 +244,16 @@ export const makeChatsSocket = (config: SocketConfig) => { const nodes: BinaryNode[] = [] for(const name of collectionsToHandle) { - let state: LTHashState - if(!fromScratch) { - const result = await authState.keys.get('app-state-sync-version', [name]) - state = result[name] + const result = await authState.keys.get('app-state-sync-version', [name]) + let state = result[name] + + if(state) { + if(typeof initialVersionMap[name] === 'undefined') { + initialVersionMap[name] = state.version + } + } else { + state = newLTHashState() } - if(!state) state = newLTHashState() states[name] = state @@ -285,16 +292,18 @@ export const makeChatsSocket = (config: SocketConfig) => { const { patches, hasMorePatches, snapshot } = decoded[name] try { if(snapshot) { - const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) + const { state: newState, mutations } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name]) states[name] = newState - logger.info(`restored state of ${name} from snapshot to v${newState.version}`) + logger.info(`restored state of ${name} from snapshot to v${newState.version} with ${mutations.length} mutations`) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) + + appStateChunk.totalMutations.push(...mutations) } // only process if there are syncd patches if(patches.length) { - const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) + const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, initialVersionMap[name]) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) @@ -313,9 +322,12 @@ export const makeChatsSocket = (config: SocketConfig) => { } catch(error) { logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch') await authState.keys.set({ "app-state-sync-version": { [name]: null } }) - + // increment number of retries attemptsMap[name] = (attemptsMap[name] || 0) + 1 - if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS) { + // if retry attempts overshoot + // or key not found + if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404) { + // stop retrying collectionsToHandle.delete(name) } } @@ -431,6 +443,8 @@ export const makeChatsSocket = (config: SocketConfig) => { } const processSyncActions = (actions: ChatMutation[]) => { + console.log(actions) + const updates: { [jid: string]: Partial } = {} const contactUpdates: { [jid: string]: Contact } = {} const msgDeletes: proto.IMessageKey[] = [] @@ -637,7 +651,7 @@ export const makeChatsSocket = (config: SocketConfig) => { const name = update.attrs.name as WAPatchName mutationMutex.mutex( async() => { - await resyncAppState([name], false) + await resyncAppState([name]) .catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) } ) diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index 994821e..06b7e50 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -10,7 +10,10 @@ export interface PresenceData { lastSeen?: number } -export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number } +export type ChatMutation = { + syncAction: proto.ISyncActionData + index: string[] +} export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] } diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 0b25d5a..fd2fc10 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -187,7 +187,7 @@ export const decodeSyncdMutations = async( if(!key) { const keyEnc = await getAppStateSyncKey(base64Key) if(!keyEnc) { - throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500, data: { msgMutations } }) + throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } }) } const result = mutationKeys(keyEnc.keyData!) keyCache[base64Key] = result @@ -229,17 +229,16 @@ export const decodeSyncdMutations = async( } } - const indexStr = Buffer.from(syncAction.index).toString() - const mutation: ChatMutation = { + const indexStr = Buffer.from(syncAction.index).toString() + mutations.push({ syncAction, index: JSON.parse(indexStr), + }) + ltGenerator.mix({ indexMac: record.index!.blob!, valueMac: ogValueMac, - operation: operation, - } - mutations.push(mutation) - - ltGenerator.mix(mutation) + operation: operation + }) } return { mutations, ...ltGenerator.finish() } @@ -339,20 +338,15 @@ export const decodeSyncdSnapshot = async( name: WAPatchName, snapshot: proto.ISyncdSnapshot, getAppStateSyncKey: FetchAppStateSyncKey, + minimumVersionNumber: number | undefined, validateMacs: boolean = true ) => { const newState = newLTHashState() newState.version = toNumber(snapshot.version!.version!) - const records = snapshot.records! - - const ltGenerator = makeLtHashGenerator(newState) - for(const { index, value } of records) { - const valueMac = value.blob!.slice(-32)! - ltGenerator.mix({ indexMac: index.blob!, valueMac, operation: 0 }) - } - - Object.assign(newState, ltGenerator.finish()) + let { hash, indexValueMap, mutations } = await decodeSyncdMutations(snapshot.records!, newState, getAppStateSyncKey, validateMacs) + newState.hash = hash + newState.indexValueMap = indexValueMap if(validateMacs) { const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64') @@ -367,7 +361,15 @@ export const decodeSyncdSnapshot = async( } } - return newState + const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber + if(!areMutationsRequired) { + mutations = [] + } + + return { + state: newState, + mutations + } } export const decodePatches = async( @@ -375,6 +377,7 @@ export const decodePatches = async( syncds: proto.ISyncdPatch[], initial: LTHashState, getAppStateSyncKey: FetchAppStateSyncKey, + minimumVersionNumber?: number, validateMacs: boolean = true ) => { const successfulMutations: ChatMutation[] = [] @@ -390,14 +393,18 @@ export const decodePatches = async( const ref = await downloadExternalPatch(syncd.externalMutations) syncd.mutations.push(...ref.mutations) } + + const patchVersion = toNumber(version.version!) - newState.version = toNumber(version.version!) + newState.version = patchVersion const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs) newState.hash = decodeResult.hash newState.indexValueMap = decodeResult.indexValueMap - successfulMutations.push(...decodeResult.mutations) + if(typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber) { + successfulMutations.push(...decodeResult.mutations) + } if(validateMacs) { const base64Key = Buffer.from(keyId!.id!).toString('base64')