diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 8c16e66..50b8ba5 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,4 +1,4 @@ -import { SocketConfig, WAPresence, PresenceData, Chat, WAPatchCreate, WAMediaUpload, ChatMutation, WAPatchName, LTHashState, ChatModification, Contact } from "../Types"; +import { SocketConfig, WAPresence, PresenceData, Chat, WAPatchCreate, WAMediaUpload, ChatMutation, WAPatchName, AppStateChunk, LTHashState, ChatModification, Contact } from "../Types"; import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, S_WHATSAPP_NET, reduceBinaryNodeToDictionary } from "../WABinary"; import { proto } from '../../WAProto' import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch, decodeSyncdSnapshot, newLTHashState } from "../Utils"; @@ -180,88 +180,101 @@ export const makeChatsSocket = (config: SocketConfig) => { const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => { if(fromScratch) returnSnapshot = true - const totalMutations: ChatMutation[] = [] + const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []} await authState.keys.transaction( async() => { - const states = { } as { [T in WAPatchName]: LTHashState } - for(const name of collections) { - let state: LTHashState - if(!fromScratch) { - const result = await authState.keys.get('app-state-sync-version', [name]) - state = result[name] - } - if(!state) state = newLTHashState() - - states[name] = state - - logger.info(`resyncing ${name} from v${state.version}`) - } - const result = await query({ - tag: 'iq', - attrs: { - to: S_WHATSAPP_NET, - xmlns: 'w:sync:app:state', - type: 'set' - }, - content: [ - { - tag: 'sync', - attrs: { }, - content: collections.map( - (name) => ({ - tag: 'collection', - attrs: { - name, - version: states[name].version.toString(), - return_snapshot: returnSnapshot ? 'true' : 'false' - } - }) - ) + const collectionsToHandle = new Set(collections) + // keep executing till all collections are done + // sometimes a single patch request will not return all the patches (God knows why) + // so we fetch till they're all done (this is determined by the "has_more_patches" flag) + while(collectionsToHandle.size) { + const states = { } as { [T in WAPatchName]: LTHashState } + for(const name of collectionsToHandle) { + let state: LTHashState + if(!fromScratch) { + const result = await authState.keys.get('app-state-sync-version', [name]) + state = result[name] } - ] - }) + if(!state) state = newLTHashState() + + states[name] = state + + logger.info(`resyncing ${name} from v${state.version}`) + } + const result = await query({ + tag: 'iq', + attrs: { + to: S_WHATSAPP_NET, + xmlns: 'w:sync:app:state', + type: 'set' + }, + content: [ + { + tag: 'sync', + attrs: { }, + content: collections.map( + (name) => ({ + tag: 'collection', + attrs: { + name, + version: states[name].version.toString(), + return_snapshot: returnSnapshot ? 'true' : 'false' + } + }) + ) + } + ] + }) + + const decoded = await extractSyncdPatches(result) // extract from binary node + for(const key in decoded) { + const name = key as WAPatchName + const { patches, hasMorePatches, snapshot } = decoded[name] + if(snapshot) { + const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) + states[name] = newState + + logger.info(`restored state of ${name} from snapshot to v${newState.version}`) + } + // only process if there are syncd patches + if(patches.length) { + const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) + + await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) - const decoded = await extractSyncdPatches(result) // extract from binary node - for(const key in decoded) { - const name = key as WAPatchName - const { patches, snapshot } = decoded[name] - if(snapshot) { - const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) - states[name] = newState + logger.info(`synced ${name} to v${newState.version}`) + if(newMutations.length) { + logger.trace({ newMutations, name }, 'recv new mutations') + } - logger.info(`restored state of ${name} from snapshot to v${newState.version}`) - } - // only process if there are syncd patches - if(patches.length) { - const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) - - await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) - - logger.info(`synced ${name} to v${newState.version}`) - if(newMutations.length) { - logger.trace({ newMutations, name }, 'recv new mutations') + appStateChunk.totalMutations.push(...newMutations) + } + if(hasMorePatches) { + logger.info(`${name} has more patches...`) + } else { // collection is done with sync + collectionsToHandle.delete(name) } - - totalMutations.push(...newMutations) } } } ) - processSyncActions(totalMutations) + processSyncActions(appStateChunk.totalMutations) - return totalMutations + return appStateChunk } const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => { - let result: ChatMutation[] + let result : AppStateChunk + try { result = await resyncAppStateInternal(collections, false, returnSnapshot) } catch(error) { logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch') result = await resyncAppStateInternal(collections, true, true) } + return result } diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index 3d6c77d..994821e 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -12,6 +12,8 @@ export interface PresenceData { export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number } +export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] } + export type WAPatchCreate = { syncAction: proto.ISyncActionValue index: string[] diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 0990eea..0b25d5a 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -272,7 +272,7 @@ export const extractSyncdPatches = async(result: BinaryNode) => { const syncNode = getBinaryNodeChild(result, 'sync') const collectionNodes = getBinaryNodeChildren(syncNode, 'collection') - const final = { } as { [T in WAPatchName]: { patches: proto.ISyncdPatch[], snapshot?: proto.ISyncdSnapshot } } + const final = { } as { [T in WAPatchName]: { patches: proto.ISyncdPatch[], hasMorePatches: boolean, snapshot?: proto.ISyncdSnapshot } } await Promise.all( collectionNodes.map( async collectionNode => { @@ -283,6 +283,8 @@ export const extractSyncdPatches = async(result: BinaryNode) => { const syncds: proto.ISyncdPatch[] = [] const name = collectionNode.attrs.name as WAPatchName + + const hasMorePatches = collectionNode.attrs.has_more_patches == 'true' let snapshot: proto.ISyncdSnapshot | undefined = undefined if(snapshotNode && !!snapshotNode.content) { @@ -309,7 +311,7 @@ export const extractSyncdPatches = async(result: BinaryNode) => { } } - final[name] = { patches: syncds, snapshot } + final[name] = { patches: syncds, hasMorePatches, snapshot } } ) )