fix: mutex app state sync to prevent race conditions that log connection out

This commit is contained in:
Adhiraj Singh
2021-11-18 11:30:35 +05:30
parent 40280d777d
commit bb5f13d188
2 changed files with 84 additions and 49 deletions

View File

@@ -3,6 +3,7 @@ import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUse
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch } from "../Utils"; import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch } from "../Utils";
import { makeMessagesRecvSocket } from "./messages-recv"; import { makeMessagesRecvSocket } from "./messages-recv";
import makeMutex from "../Utils/make-mutex";
export const makeChatsSocket = (config: SocketConfig) => { export const makeChatsSocket = (config: SocketConfig) => {
const { logger } = config const { logger } = config
@@ -17,6 +18,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
fetchPrivacySettings, fetchPrivacySettings,
} = sock } = sock
const mutationMutex = makeMutex()
const interactiveQuery = async(userNodes: BinaryNode[], queryNode: BinaryNode) => { const interactiveQuery = async(userNodes: BinaryNode[], queryNode: BinaryNode) => {
const result = await query({ const result = await query({
tag: 'iq', tag: 'iq',
@@ -216,7 +219,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
processSyncActions(newMutations) processSyncActions(newMutations)
} }
} }
ev.emit('auth-state.update', authState) ev.emit('auth-state.update', authState)
} }
@@ -354,59 +356,63 @@ export const makeChatsSocket = (config: SocketConfig) => {
const appPatch = async(patchCreate: WAPatchCreate) => { const appPatch = async(patchCreate: WAPatchCreate) => {
const name = patchCreate.type const name = patchCreate.type
try { await mutationMutex.mutex(
await resyncAppState([name]) async() => {
} catch(error) { try {
logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch') await resyncAppState([name])
await resyncAppState([name], true) } catch(error) {
} logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch')
await resyncAppState([name], true)
const { patch, state } = await encodeSyncdPatch( }
patchCreate, const { patch, state } = await encodeSyncdPatch(
authState, patchCreate,
) authState,
const initial = await authState.keys.getAppStateSyncVersion(name) )
// temp: verify it was encoded correctly const initial = await authState.keys.getAppStateSyncVersion(name)
const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, authState) // temp: verify it was encoded correctly
const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, authState)
const node: BinaryNode = {
tag: 'iq', const node: BinaryNode = {
attrs: { tag: 'iq',
to: S_WHATSAPP_NET, attrs: {
type: 'set', to: S_WHATSAPP_NET,
xmlns: 'w:sync:app:state' type: 'set',
}, xmlns: 'w:sync:app:state'
content: [ },
{
tag: 'sync',
attrs: { },
content: [ content: [
{ {
tag: 'collection', tag: 'sync',
attrs: { attrs: { },
name,
version: (state.version-1).toString(),
return_snapshot: 'false'
},
content: [ content: [
{ {
tag: 'patch', tag: 'collection',
attrs: { }, attrs: {
content: proto.SyncdPatch.encode(patch).finish() name,
version: (state.version-1).toString(),
return_snapshot: 'false'
},
content: [
{
tag: 'patch',
attrs: { },
content: proto.SyncdPatch.encode(patch).finish()
}
]
} }
] ]
} }
] ]
} }
] await query(node)
}
await query(node) await authState.keys.setAppStateSyncVersion(name, state)
ev.emit('auth-state.update', authState)
await authState.keys.setAppStateSyncVersion(name, state)
ev.emit('auth-state.update', authState) if(config.emitOwnEvents) {
if(config.emitOwnEvents) { processSyncActions(result.newMutations)
processSyncActions(result.newMutations) }
} }
)
} }
const chatModify = (mod: ChatModification, jid: string, lastMessages: Pick<proto.IWebMessageInfo, 'key' | 'messageTimestamp'>[]) => { const chatModify = (mod: ChatModification, jid: string, lastMessages: Pick<proto.IWebMessageInfo, 'key' | 'messageTimestamp'>[]) => {
@@ -434,8 +440,12 @@ export const makeChatsSocket = (config: SocketConfig) => {
const update = getBinaryNodeChild(node, 'collection') const update = getBinaryNodeChild(node, 'collection')
if(update) { if(update) {
const name = update.attrs.name as WAPatchName const name = update.attrs.name as WAPatchName
resyncAppState([name], false) mutationMutex.mutex(
.catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) async() => {
await resyncAppState([name], false)
.catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`))
}
)
} }
}) })
@@ -444,8 +454,14 @@ export const makeChatsSocket = (config: SocketConfig) => {
sendPresenceUpdate('available') sendPresenceUpdate('available')
fetchBlocklist() fetchBlocklist()
fetchPrivacySettings() fetchPrivacySettings()
resyncAppState([ 'critical_block', 'critical_unblock_low' ]) mutationMutex.mutex(
.catch(err => logger.info({ trace: err.stack }, 'failed to sync app state')) async() => {
await (
resyncAppState([ 'critical_block', 'critical_unblock_low' ])
.catch(err => logger.info({ trace: err.stack }, 'failed to sync app state'))
)
}
)
} }
}) })

19
src/Utils/make-mutex.ts Normal file
View File

@@ -0,0 +1,19 @@
export default () => {
let task = Promise.resolve() as Promise<any>
return {
mutex<T>(code: () => Promise<T>):Promise<T> {
task = (async () => {
// wait for the previous task to complete
// if there is an error, we swallow so as to not block the queue
try { await task } catch { }
// execute the current task
return code()
})()
// we replace the existing task, appending the new piece of execution to it
// so the next task will have to wait for this one to finish
return task
},
}
}