Fix #1010 by asking for full list of sync states when needed (#1043)

* Process patches that are longer.

* Fixed type declaration

* Changed resync behavior from recusive to iterative

* refactor: cleaner handling of hasMorePatches

Co-authored-by: Adhiraj Singh <adhirajsingh1001@gmail.com>
This commit is contained in:
burstfreeze
2021-12-22 16:27:58 +01:00
committed by GitHub
parent 4710f6603a
commit 98af4a6624
3 changed files with 80 additions and 63 deletions

View File

@@ -1,4 +1,4 @@
import { SocketConfig, WAPresence, PresenceData, Chat, WAPatchCreate, WAMediaUpload, ChatMutation, WAPatchName, LTHashState, ChatModification, Contact } from "../Types"; import { SocketConfig, WAPresence, PresenceData, Chat, WAPatchCreate, WAMediaUpload, ChatMutation, WAPatchName, AppStateChunk, LTHashState, ChatModification, Contact } from "../Types";
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, S_WHATSAPP_NET, reduceBinaryNodeToDictionary } from "../WABinary"; import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, S_WHATSAPP_NET, reduceBinaryNodeToDictionary } from "../WABinary";
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch, decodeSyncdSnapshot, newLTHashState } from "../Utils"; import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch, decodeSyncdSnapshot, newLTHashState } from "../Utils";
@@ -180,88 +180,101 @@ export const makeChatsSocket = (config: SocketConfig) => {
const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => { const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => {
if(fromScratch) returnSnapshot = true if(fromScratch) returnSnapshot = true
const totalMutations: ChatMutation[] = [] const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []}
await authState.keys.transaction( await authState.keys.transaction(
async() => { async() => {
const states = { } as { [T in WAPatchName]: LTHashState } const collectionsToHandle = new Set<string>(collections)
for(const name of collections) { // keep executing till all collections are done
let state: LTHashState // sometimes a single patch request will not return all the patches (God knows why)
if(!fromScratch) { // so we fetch till they're all done (this is determined by the "has_more_patches" flag)
const result = await authState.keys.get('app-state-sync-version', [name]) while(collectionsToHandle.size) {
state = result[name] const states = { } as { [T in WAPatchName]: LTHashState }
} for(const name of collectionsToHandle) {
if(!state) state = newLTHashState() let state: LTHashState
if(!fromScratch) {
states[name] = state const result = await authState.keys.get('app-state-sync-version', [name])
state = result[name]
logger.info(`resyncing ${name} from v${state.version}`)
}
const result = await query({
tag: 'iq',
attrs: {
to: S_WHATSAPP_NET,
xmlns: 'w:sync:app:state',
type: 'set'
},
content: [
{
tag: 'sync',
attrs: { },
content: collections.map(
(name) => ({
tag: 'collection',
attrs: {
name,
version: states[name].version.toString(),
return_snapshot: returnSnapshot ? 'true' : 'false'
}
})
)
} }
] if(!state) state = newLTHashState()
})
states[name] = state
logger.info(`resyncing ${name} from v${state.version}`)
}
const result = await query({
tag: 'iq',
attrs: {
to: S_WHATSAPP_NET,
xmlns: 'w:sync:app:state',
type: 'set'
},
content: [
{
tag: 'sync',
attrs: { },
content: collections.map(
(name) => ({
tag: 'collection',
attrs: {
name,
version: states[name].version.toString(),
return_snapshot: returnSnapshot ? 'true' : 'false'
}
})
)
}
]
})
const decoded = await extractSyncdPatches(result) // extract from binary node
for(const key in decoded) {
const name = key as WAPatchName
const { patches, hasMorePatches, snapshot } = decoded[name]
if(snapshot) {
const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey)
states[name] = newState
logger.info(`restored state of ${name} from snapshot to v${newState.version}`)
}
// only process if there are syncd patches
if(patches.length) {
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true)
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
const decoded = await extractSyncdPatches(result) // extract from binary node logger.info(`synced ${name} to v${newState.version}`)
for(const key in decoded) { if(newMutations.length) {
const name = key as WAPatchName logger.trace({ newMutations, name }, 'recv new mutations')
const { patches, snapshot } = decoded[name] }
if(snapshot) {
const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey)
states[name] = newState
logger.info(`restored state of ${name} from snapshot to v${newState.version}`) appStateChunk.totalMutations.push(...newMutations)
} }
// only process if there are syncd patches if(hasMorePatches) {
if(patches.length) { logger.info(`${name} has more patches...`)
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) } else { // collection is done with sync
collectionsToHandle.delete(name)
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
logger.info(`synced ${name} to v${newState.version}`)
if(newMutations.length) {
logger.trace({ newMutations, name }, 'recv new mutations')
} }
totalMutations.push(...newMutations)
} }
} }
} }
) )
processSyncActions(totalMutations) processSyncActions(appStateChunk.totalMutations)
return totalMutations return appStateChunk
} }
const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => { const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => {
let result: ChatMutation[] let result : AppStateChunk
try { try {
result = await resyncAppStateInternal(collections, false, returnSnapshot) result = await resyncAppStateInternal(collections, false, returnSnapshot)
} catch(error) { } catch(error) {
logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch') logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch')
result = await resyncAppStateInternal(collections, true, true) result = await resyncAppStateInternal(collections, true, true)
} }
return result return result
} }

View File

@@ -12,6 +12,8 @@ export interface PresenceData {
export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number } export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number }
export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] }
export type WAPatchCreate = { export type WAPatchCreate = {
syncAction: proto.ISyncActionValue syncAction: proto.ISyncActionValue
index: string[] index: string[]

View File

@@ -272,7 +272,7 @@ export const extractSyncdPatches = async(result: BinaryNode) => {
const syncNode = getBinaryNodeChild(result, 'sync') const syncNode = getBinaryNodeChild(result, 'sync')
const collectionNodes = getBinaryNodeChildren(syncNode, 'collection') const collectionNodes = getBinaryNodeChildren(syncNode, 'collection')
const final = { } as { [T in WAPatchName]: { patches: proto.ISyncdPatch[], snapshot?: proto.ISyncdSnapshot } } const final = { } as { [T in WAPatchName]: { patches: proto.ISyncdPatch[], hasMorePatches: boolean, snapshot?: proto.ISyncdSnapshot } }
await Promise.all( await Promise.all(
collectionNodes.map( collectionNodes.map(
async collectionNode => { async collectionNode => {
@@ -283,6 +283,8 @@ export const extractSyncdPatches = async(result: BinaryNode) => {
const syncds: proto.ISyncdPatch[] = [] const syncds: proto.ISyncdPatch[] = []
const name = collectionNode.attrs.name as WAPatchName const name = collectionNode.attrs.name as WAPatchName
const hasMorePatches = collectionNode.attrs.has_more_patches == 'true'
let snapshot: proto.ISyncdSnapshot | undefined = undefined let snapshot: proto.ISyncdSnapshot | undefined = undefined
if(snapshotNode && !!snapshotNode.content) { if(snapshotNode && !!snapshotNode.content) {
@@ -309,7 +311,7 @@ export const extractSyncdPatches = async(result: BinaryNode) => {
} }
} }
final[name] = { patches: syncds, snapshot } final[name] = { patches: syncds, hasMorePatches, snapshot }
} }
) )
) )