refactor: app state handling

1. fixes snapshot patches not being included
2. fixes all mutations being passed when syncing from scratch
3. simpler chat mutation model
4. do not retry if key is not found
This commit is contained in:
Adhiraj Singh
2022-01-07 14:29:52 +05:30
parent da65ae8f42
commit d15bde5d17
3 changed files with 60 additions and 36 deletions

View File

@@ -143,7 +143,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
await query({ await query({
tag: 'iq', tag: 'iq',
attrs: { attrs: {
xmlns: 'blocklist', xmlns: 'blocklist',
to: S_WHATSAPP_NET, to: S_WHATSAPP_NET,
type: 'set' type: 'set'
}, },
@@ -225,8 +225,11 @@ export const makeChatsSocket = (config: SocketConfig) => {
}) })
} }
const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false) => { const resyncAppState = async(collections: WAPatchName[]) => {
const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []} const appStateChunk: AppStateChunk = {totalMutations: [], collectionsToHandle: []}
// we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire
const initialVersionMap: { [T in WAPatchName]?: number } = { }
await authState.keys.transaction( await authState.keys.transaction(
async() => { async() => {
@@ -241,12 +244,16 @@ export const makeChatsSocket = (config: SocketConfig) => {
const nodes: BinaryNode[] = [] const nodes: BinaryNode[] = []
for(const name of collectionsToHandle) { for(const name of collectionsToHandle) {
let state: LTHashState const result = await authState.keys.get('app-state-sync-version', [name])
if(!fromScratch) { let state = result[name]
const result = await authState.keys.get('app-state-sync-version', [name])
state = result[name] if(state) {
if(typeof initialVersionMap[name] === 'undefined') {
initialVersionMap[name] = state.version
}
} else {
state = newLTHashState()
} }
if(!state) state = newLTHashState()
states[name] = state states[name] = state
@@ -285,16 +292,18 @@ export const makeChatsSocket = (config: SocketConfig) => {
const { patches, hasMorePatches, snapshot } = decoded[name] const { patches, hasMorePatches, snapshot } = decoded[name]
try { try {
if(snapshot) { if(snapshot) {
const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey) const { state: newState, mutations } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name])
states[name] = newState states[name] = newState
logger.info(`restored state of ${name} from snapshot to v${newState.version}`) logger.info(`restored state of ${name} from snapshot to v${newState.version} with ${mutations.length} mutations`)
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
appStateChunk.totalMutations.push(...mutations)
} }
// only process if there are syncd patches // only process if there are syncd patches
if(patches.length) { if(patches.length) {
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true) const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, initialVersionMap[name])
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
@@ -313,9 +322,12 @@ export const makeChatsSocket = (config: SocketConfig) => {
} catch(error) { } catch(error) {
logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch') logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch')
await authState.keys.set({ "app-state-sync-version": { [name]: null } }) await authState.keys.set({ "app-state-sync-version": { [name]: null } })
// increment number of retries
attemptsMap[name] = (attemptsMap[name] || 0) + 1 attemptsMap[name] = (attemptsMap[name] || 0) + 1
if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS) { // if retry attempts overshoot
// or key not found
if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS || error.output?.statusCode === 404) {
// stop retrying
collectionsToHandle.delete(name) collectionsToHandle.delete(name)
} }
} }
@@ -431,6 +443,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
const processSyncActions = (actions: ChatMutation[]) => { const processSyncActions = (actions: ChatMutation[]) => {
console.log(actions)
const updates: { [jid: string]: Partial<Chat> } = {} const updates: { [jid: string]: Partial<Chat> } = {}
const contactUpdates: { [jid: string]: Contact } = {} const contactUpdates: { [jid: string]: Contact } = {}
const msgDeletes: proto.IMessageKey[] = [] const msgDeletes: proto.IMessageKey[] = []
@@ -637,7 +651,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
const name = update.attrs.name as WAPatchName const name = update.attrs.name as WAPatchName
mutationMutex.mutex( mutationMutex.mutex(
async() => { async() => {
await resyncAppState([name], false) await resyncAppState([name])
.catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) .catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`))
} }
) )

View File

@@ -10,7 +10,10 @@ export interface PresenceData {
lastSeen?: number lastSeen?: number
} }
export type ChatMutation = { syncAction: proto.ISyncActionData, index: string[], indexMac: Uint8Array, valueMac: Uint8Array, operation: number } export type ChatMutation = {
syncAction: proto.ISyncActionData
index: string[]
}
export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] } export type AppStateChunk = { totalMutations : ChatMutation[], collectionsToHandle: WAPatchName[] }

View File

@@ -187,7 +187,7 @@ export const decodeSyncdMutations = async(
if(!key) { if(!key) {
const keyEnc = await getAppStateSyncKey(base64Key) const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) { if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500, data: { msgMutations } }) throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } })
} }
const result = mutationKeys(keyEnc.keyData!) const result = mutationKeys(keyEnc.keyData!)
keyCache[base64Key] = result keyCache[base64Key] = result
@@ -230,16 +230,15 @@ export const decodeSyncdMutations = async(
} }
const indexStr = Buffer.from(syncAction.index).toString() const indexStr = Buffer.from(syncAction.index).toString()
const mutation: ChatMutation = { mutations.push({
syncAction, syncAction,
index: JSON.parse(indexStr), index: JSON.parse(indexStr),
})
ltGenerator.mix({
indexMac: record.index!.blob!, indexMac: record.index!.blob!,
valueMac: ogValueMac, valueMac: ogValueMac,
operation: operation, operation: operation
} })
mutations.push(mutation)
ltGenerator.mix(mutation)
} }
return { mutations, ...ltGenerator.finish() } return { mutations, ...ltGenerator.finish() }
@@ -339,20 +338,15 @@ export const decodeSyncdSnapshot = async(
name: WAPatchName, name: WAPatchName,
snapshot: proto.ISyncdSnapshot, snapshot: proto.ISyncdSnapshot,
getAppStateSyncKey: FetchAppStateSyncKey, getAppStateSyncKey: FetchAppStateSyncKey,
minimumVersionNumber: number | undefined,
validateMacs: boolean = true validateMacs: boolean = true
) => { ) => {
const newState = newLTHashState() const newState = newLTHashState()
newState.version = toNumber(snapshot.version!.version!) newState.version = toNumber(snapshot.version!.version!)
const records = snapshot.records! let { hash, indexValueMap, mutations } = await decodeSyncdMutations(snapshot.records!, newState, getAppStateSyncKey, validateMacs)
newState.hash = hash
const ltGenerator = makeLtHashGenerator(newState) newState.indexValueMap = indexValueMap
for(const { index, value } of records) {
const valueMac = value.blob!.slice(-32)!
ltGenerator.mix({ indexMac: index.blob!, valueMac, operation: 0 })
}
Object.assign(newState, ltGenerator.finish())
if(validateMacs) { if(validateMacs) {
const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64') const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64')
@@ -367,7 +361,15 @@ export const decodeSyncdSnapshot = async(
} }
} }
return newState const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber
if(!areMutationsRequired) {
mutations = []
}
return {
state: newState,
mutations
}
} }
export const decodePatches = async( export const decodePatches = async(
@@ -375,6 +377,7 @@ export const decodePatches = async(
syncds: proto.ISyncdPatch[], syncds: proto.ISyncdPatch[],
initial: LTHashState, initial: LTHashState,
getAppStateSyncKey: FetchAppStateSyncKey, getAppStateSyncKey: FetchAppStateSyncKey,
minimumVersionNumber?: number,
validateMacs: boolean = true validateMacs: boolean = true
) => { ) => {
const successfulMutations: ChatMutation[] = [] const successfulMutations: ChatMutation[] = []
@@ -391,13 +394,17 @@ export const decodePatches = async(
syncd.mutations.push(...ref.mutations) syncd.mutations.push(...ref.mutations)
} }
newState.version = toNumber(version.version!) const patchVersion = toNumber(version.version!)
newState.version = patchVersion
const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs) const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, validateMacs)
newState.hash = decodeResult.hash newState.hash = decodeResult.hash
newState.indexValueMap = decodeResult.indexValueMap newState.indexValueMap = decodeResult.indexValueMap
successfulMutations.push(...decodeResult.mutations) if(typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber) {
successfulMutations.push(...decodeResult.mutations)
}
if(validateMacs) { if(validateMacs) {
const base64Key = Buffer.from(keyId!.id!).toString('base64') const base64Key = Buffer.from(keyId!.id!).toString('base64')