feat: implement external patch parsing + app state sync on login

This commit is contained in:
Adhiraj Singh
2021-11-08 15:07:25 +05:30
parent 3c6edde1d6
commit ba453a588b
6 changed files with 88 additions and 86 deletions

View File

@@ -167,7 +167,16 @@ export const makeChatsSocket = (config: SocketConfig) => {
})
}
const collectionSync = async(collections: { name: WAPatchName, version: number }[]) => {
const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => {
const states = { } as { [T in WAPatchName]: LTHashState }
for(const name of collections) {
let state: LTHashState = fromScratch ? undefined : await authState.keys.getAppStateSyncVersion(name)
if(!state) state = { version: 0, hash: Buffer.alloc(128), mutations: [] }
states[name] = state
logger.info(`resyncing ${name} from v${state.version}`)
}
const result = await query({
tag: 'iq',
attrs: {
@@ -180,25 +189,35 @@ export const makeChatsSocket = (config: SocketConfig) => {
tag: 'sync',
attrs: { },
content: collections.map(
({ name, version }) => ({
(name) => ({
tag: 'collection',
attrs: { name, version: version.toString(), return_snapshot: 'true' }
attrs: {
name,
version: states[name].version.toString(),
return_snapshot: returnSnapshot ? 'true' : 'false'
}
})
)
}
]
})
const syncNode = getBinaryNodeChild(result, 'sync')
const collectionNodes = getBinaryNodeChildren(syncNode, 'collection')
return collectionNodes.reduce(
(dict, node) => {
const snapshotNode = getBinaryNodeChild(node, 'snapshot')
if(snapshotNode) {
dict[node.attrs.name] = snapshotNode.content as Uint8Array
}
return dict
}, { } as { [P in WAPatchName]: Uint8Array }
)
const decoded = extractSyncdPatches(result) // extract from binary node
for(const key in decoded) {
const name = key as WAPatchName
// only process if there are syncd patches
if(decoded[name].length) {
const { newMutations, state: newState } = await decodePatches(name, decoded[name], states[name], authState, true)
await authState.keys.setAppStateSyncVersion(name, newState)
logger.info(`synced ${name} to v${newState.version}`)
processSyncActions(newMutations)
}
}
ev.emit('auth-state.update', authState)
}
/**
@@ -282,7 +301,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
const processSyncActions = (actions: ChatMutation[]) => {
const updates: { [jid: string]: Partial<Chat> } = {}
const contactUpdates: { [jid: string]: Contact } = {}
const msgDeletes: proto.IMessageKey[] = []
@@ -337,10 +355,10 @@ export const makeChatsSocket = (config: SocketConfig) => {
const appPatch = async(patchCreate: WAPatchCreate) => {
const name = patchCreate.type
try {
await resyncState(name, false)
await resyncAppState([name])
} catch(error) {
logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch')
await resyncState(name, true)
await resyncAppState([name], true)
}
const { patch, state } = await encodeSyncdPatch(
@@ -349,7 +367,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
)
const initial = await authState.keys.getAppStateSyncVersion(name)
// temp: verify it was encoded correctly
const result = await decodePatches({ syncds: [{ ...patch, version: { version: state.version }, }], name }, initial, authState)
const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, authState)
const node: BinaryNode = {
tag: 'iq',
@@ -396,52 +414,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
return appPatch(patch)
}
const fetchAppState = async(name: WAPatchName, fromVersion: number) => {
const result = await query({
tag: 'iq',
attrs: {
type: 'set',
xmlns: 'w:sync:app:state',
to: S_WHATSAPP_NET
},
content: [
{
tag: 'sync',
attrs: { },
content: [
{
tag: 'collection',
attrs: {
name,
version: fromVersion.toString(),
return_snapshot: 'false'
}
}
]
}
]
})
return result
}
const resyncState = async(name: WAPatchName, fromScratch: boolean) => {
let state: LTHashState = fromScratch ? undefined : await authState.keys.getAppStateSyncVersion(name)
if(!state) state = { version: 0, hash: Buffer.alloc(128), mutations: [] }
logger.info(`resyncing ${name} from v${state.version}`)
const result = await fetchAppState(name, state.version)
const decoded = extractSyncdPatches(result) // extract from binary node
const { newMutations, state: newState } = await decodePatches(decoded, state, authState, true)
await authState.keys.setAppStateSyncVersion(name, newState)
logger.info(`synced ${name} to v${newState.version}`)
processSyncActions(newMutations)
ev.emit('auth-state.update', authState)
}
ws.on('CB:presence', handlePresenceUpdate)
ws.on('CB:chatstate', handlePresenceUpdate)
@@ -461,7 +433,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
ws.on('CB:notification,type:server_sync', (node: BinaryNode) => {
const update = getBinaryNodeChild(node, 'collection')
if(update) {
resyncState(update.attrs.name as WAPatchName, false)
const name = update.attrs.name as WAPatchName
resyncAppState([name], false)
.catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`))
}
})
@@ -471,6 +444,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
sendPresenceUpdate('available')
fetchBlocklist()
fetchPrivacySettings()
resyncAppState([ 'critical_block', 'critical_unblock_low' ])
}
})
@@ -485,7 +459,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
fetchStatus,
updateProfilePicture,
updateBlockStatus,
resyncState,
resyncAppState,
chatModify,
}
}