diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index f1ee63f..6fe85c8 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -3,6 +3,7 @@ import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUse import { proto } from '../../WAProto' import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch } from "../Utils"; import { makeMessagesRecvSocket } from "./messages-recv"; +import makeMutex from "../Utils/make-mutex"; export const makeChatsSocket = (config: SocketConfig) => { const { logger } = config @@ -17,6 +18,8 @@ export const makeChatsSocket = (config: SocketConfig) => { fetchPrivacySettings, } = sock + const mutationMutex = makeMutex() + const interactiveQuery = async(userNodes: BinaryNode[], queryNode: BinaryNode) => { const result = await query({ tag: 'iq', @@ -216,7 +219,6 @@ export const makeChatsSocket = (config: SocketConfig) => { processSyncActions(newMutations) } } - ev.emit('auth-state.update', authState) } @@ -354,59 +356,63 @@ export const makeChatsSocket = (config: SocketConfig) => { const appPatch = async(patchCreate: WAPatchCreate) => { const name = patchCreate.type - try { - await resyncAppState([name]) - } catch(error) { - logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch') - await resyncAppState([name], true) - } - - const { patch, state } = await encodeSyncdPatch( - patchCreate, - authState, - ) - const initial = await authState.keys.getAppStateSyncVersion(name) - // temp: verify it was encoded correctly - const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, authState) - - const node: BinaryNode = { - tag: 'iq', - attrs: { - to: S_WHATSAPP_NET, - type: 'set', - xmlns: 'w:sync:app:state' - }, - content: [ - { - tag: 'sync', - attrs: { }, + await mutationMutex.mutex( + async() => { + try { + await resyncAppState([name]) + } catch(error) { + logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch') + await resyncAppState([name], true) + } + const { patch, state } = await encodeSyncdPatch( + patchCreate, + authState, + ) + const initial = await authState.keys.getAppStateSyncVersion(name) + // temp: verify it was encoded correctly + const result = await decodePatches(name, [{ ...patch, version: { version: state.version }, }], initial, authState) + + const node: BinaryNode = { + tag: 'iq', + attrs: { + to: S_WHATSAPP_NET, + type: 'set', + xmlns: 'w:sync:app:state' + }, content: [ { - tag: 'collection', - attrs: { - name, - version: (state.version-1).toString(), - return_snapshot: 'false' - }, + tag: 'sync', + attrs: { }, content: [ { - tag: 'patch', - attrs: { }, - content: proto.SyncdPatch.encode(patch).finish() + tag: 'collection', + attrs: { + name, + version: (state.version-1).toString(), + return_snapshot: 'false' + }, + content: [ + { + tag: 'patch', + attrs: { }, + content: proto.SyncdPatch.encode(patch).finish() + } + ] } ] } ] } - ] - } - await query(node) - - await authState.keys.setAppStateSyncVersion(name, state) - ev.emit('auth-state.update', authState) - if(config.emitOwnEvents) { - processSyncActions(result.newMutations) - } + await query(node) + + await authState.keys.setAppStateSyncVersion(name, state) + ev.emit('auth-state.update', authState) + + if(config.emitOwnEvents) { + processSyncActions(result.newMutations) + } + } + ) } const chatModify = (mod: ChatModification, jid: string, lastMessages: Pick[]) => { @@ -434,8 +440,12 @@ export const makeChatsSocket = (config: SocketConfig) => { const update = getBinaryNodeChild(node, 'collection') if(update) { const name = update.attrs.name as WAPatchName - resyncAppState([name], false) - .catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) + mutationMutex.mutex( + async() => { + await resyncAppState([name], false) + .catch(err => logger.error({ trace: err.stack, node }, `failed to sync state`)) + } + ) } }) @@ -444,8 +454,14 @@ export const makeChatsSocket = (config: SocketConfig) => { sendPresenceUpdate('available') fetchBlocklist() fetchPrivacySettings() - resyncAppState([ 'critical_block', 'critical_unblock_low' ]) - .catch(err => logger.info({ trace: err.stack }, 'failed to sync app state')) + mutationMutex.mutex( + async() => { + await ( + resyncAppState([ 'critical_block', 'critical_unblock_low' ]) + .catch(err => logger.info({ trace: err.stack }, 'failed to sync app state')) + ) + } + ) } }) diff --git a/src/Utils/make-mutex.ts b/src/Utils/make-mutex.ts new file mode 100644 index 0000000..76f4148 --- /dev/null +++ b/src/Utils/make-mutex.ts @@ -0,0 +1,19 @@ + +export default () => { + let task = Promise.resolve() as Promise + return { + mutex(code: () => Promise):Promise { + task = (async () => { + // wait for the previous task to complete + // if there is an error, we swallow so as to not block the queue + try { await task } catch { } + // execute the current task + return code() + })() + // we replace the existing task, appending the new piece of execution to it + // so the next task will have to wait for this one to finish + return task + }, + } + } + \ No newline at end of file