From a9373fa0cd64480ae503bb34a75b1e07026f1042 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Wed, 22 Dec 2021 21:37:39 +0530 Subject: [PATCH] refactor: cleaner + faster app state sync --- src/Socket/chats.ts | 97 ++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 50b8ba5..29f852c 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -6,6 +6,8 @@ import { makeMessagesSocket } from "./messages-send"; import makeMutex from "../Utils/make-mutex"; import { Boom } from "@hapi/boom"; +const MAX_SYNC_ATTEMPTS = 5 + export const makeChatsSocket = (config: SocketConfig) => { const { logger } = config const sock = makeMessagesSocket(config) @@ -177,19 +179,21 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => { - if(fromScratch) returnSnapshot = true - + const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false) => { const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []} await authState.keys.transaction( async() => { const collectionsToHandle = new Set(collections) + // in case something goes wrong -- ensure we don't enter a loop that cannot be exited from + const attemptsMap = { } as { [T in WAPatchName]: number | undefined } // keep executing till all collections are done // sometimes a single patch request will not return all the patches (God knows why) // so we fetch till they're all done (this is determined by the "has_more_patches" flag) while(collectionsToHandle.size) { const states = { } as { [T in WAPatchName]: LTHashState } + const nodes: BinaryNode[] = [] + for(const name of collectionsToHandle) { let state: LTHashState if(!fromScratch) { @@ -201,7 +205,18 @@ export const makeChatsSocket = (config: SocketConfig) => { states[name] = state logger.info(`resyncing ${name} from v${state.version}`) + + nodes.push({ + tag: 'collection', + attrs: { + name, + version: state.version.toString(), + // return snapshot if being synced from scratch + return_snapshot: (!state.version).toString() + } + }) } + const result = await query({ tag: 'iq', attrs: { @@ -213,16 +228,7 @@ export const makeChatsSocket = (config: SocketConfig) => { { tag: 'sync', attrs: { }, - content: collections.map( - (name) => ({ - tag: 'collection', - attrs: { - name, - version: states[name].version.toString(), - return_snapshot: returnSnapshot ? 'true' : 'false' - } - }) - ) + content: nodes } ] }) @@ -231,29 +237,41 @@ export const makeChatsSocket = (config: SocketConfig) => { for(const key in decoded) { const name = key as WAPatchName const { patches, hasMorePatches, snapshot } = decoded[name] - if(snapshot) { - const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) - states[name] = newState + try { + if(snapshot) { + const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) + states[name] = newState + + logger.info(`restored state of ${name} from snapshot to v${newState.version}`) - logger.info(`restored state of ${name} from snapshot to v${newState.version}`) - } - // only process if there are syncd patches - if(patches.length) { - const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) - - await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) - - logger.info(`synced ${name} to v${newState.version}`) - if(newMutations.length) { - logger.trace({ newMutations, name }, 'recv new mutations') + await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) } + // only process if there are syncd patches + if(patches.length) { + const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) + + await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) + + logger.info(`synced ${name} to v${newState.version}`) + if(newMutations.length) { + logger.trace({ newMutations, name }, 'recv new mutations') + } + + appStateChunk.totalMutations.push(...newMutations) + } + if(hasMorePatches) { + logger.info(`${name} has more patches...`) + } else { // collection is done with sync + collectionsToHandle.delete(name) + } + } catch(error) { + logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch') + await authState.keys.set({ "app-state-sync-version": { [name]: null } }) - appStateChunk.totalMutations.push(...newMutations) - } - if(hasMorePatches) { - logger.info(`${name} has more patches...`) - } else { // collection is done with sync - collectionsToHandle.delete(name) + attemptsMap[name] = (attemptsMap[name] || 0) + 1 + if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS) { + collectionsToHandle.delete(name) + } } } } @@ -265,19 +283,6 @@ export const makeChatsSocket = (config: SocketConfig) => { return appStateChunk } - const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => { - let result : AppStateChunk - - try { - result = await resyncAppStateInternal(collections, false, returnSnapshot) - } catch(error) { - logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch') - result = await resyncAppStateInternal(collections, true, true) - } - - return result - } - /** * fetch the profile picture of a user/group * type = "preview" for a low res picture