diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 3820be8..e37753c 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -313,8 +313,7 @@ export const makeChatsSocket = (config: SocketConfig) => { } } - const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { - const startedBuffer = ev.buffer() + const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { const { onMutation } = newAppStateChunkHandler(recvChats) // we use this to determine which events to fire // otherwise when we resync from scratch -- all notifications will fire @@ -426,12 +425,7 @@ export const makeChatsSocket = (config: SocketConfig) => { } } ) - - // flush everything if we started the buffer here - if(startedBuffer) { - await ev.flush() - } - } + }) /** * fetch the profile picture of a user/group @@ -700,8 +694,7 @@ export const makeChatsSocket = (config: SocketConfig) => { ]) } - const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => { - const startedBuffer = ev.buffer() + const upsertMessage = ev.createBufferedFunction(async(msg: WAMessage, type: MessageUpsertType) => { ev.emit('messages.upsert', { messages: [msg], type }) if(!!msg.pushName) { @@ -739,11 +732,7 @@ export const makeChatsSocket = (config: SocketConfig) => { logger.debug('restarting app sync timeout') appStateSyncTimeout.start() } - - if(startedBuffer) { - await ev.flush() - } - } + }) ws.on('CB:presence', handlePresenceUpdate) ws.on('CB:chatstate', handlePresenceUpdate) diff --git a/src/Socket/groups.ts b/src/Socket/groups.ts index c675e81..08d4a43 100644 --- a/src/Socket/groups.ts +++ b/src/Socket/groups.ts @@ -147,7 +147,7 @@ export const makeGroupsSocket = (config: SocketConfig) => { * @param key the key of the invite message, or optionally only provide the jid of the person who sent the invite * @param inviteMessage the message to accept */ - groupAcceptInviteV4: async(key: string | WAMessageKey, inviteMessage: proto.Message.IGroupInviteMessage) => { + groupAcceptInviteV4: ev.createBufferedFunction(async(key: string | WAMessageKey, inviteMessage: proto.Message.IGroupInviteMessage) => { key = typeof key === 'string' ? { remoteJid: key } : key const results = await groupQuery(inviteMessage.groupJid!, 'set', [{ tag: 'accept', @@ -158,7 +158,6 @@ export const makeGroupsSocket = (config: SocketConfig) => { } }]) - const started = ev.buffer() // if we have the full message key // update the invite message to be expired if(key.id) { @@ -197,12 +196,8 @@ export const makeGroupsSocket = (config: SocketConfig) => { 'notify' ) - if(started) { - await ev.flush() - } - return results.attrs.from - }, + }), groupGetInviteInfo: async(code: string) => { const results = await groupQuery('@g.us', 'get', [{ tag: 'invite', attrs: { code } }]) return extractGroupMetadata(results) diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index 6091b8e..62ae013 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -40,6 +40,8 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & { * @returns true if buffering just started, false if it was already buffering * */ buffer(): boolean + /** buffers all events till the promise completes */ + createBufferedFunction(work: (...args: A) => Promise): ((...args: A) => Promise) /** flushes all buffered events */ flush(): Promise /** waits for the task to complete, before releasing the buffer */ @@ -66,6 +68,36 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }) + function buffer() { + if(!isBuffering) { + logger.trace('buffering events') + isBuffering = true + return true + } + + return false + } + + async function flush() { + if(!isBuffering) { + return + } + + logger.trace('releasing buffered events...') + await preBufferTask + + isBuffering = false + + const consolidatedData = consolidateEvents(data) + if(Object.keys(consolidatedData).length) { + ev.emit('event', consolidatedData) + } + + data = makeBufferData() + + logger.trace('released buffered events') + } + return { process(handler) { const listener = (map: BaileysEventData) => { @@ -90,33 +122,20 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = preBufferTask = Promise.allSettled([ preBufferTask, task ]) } }, - buffer() { - if(!isBuffering) { - logger.trace('buffering events') - isBuffering = true - return true + buffer, + flush, + createBufferedFunction(work) { + return async(...args) => { + const started = buffer() + try { + const result = await work(...args) + return result + } finally { + if(started) { + await flush() + } + } } - - return false - }, - async flush() { - if(!isBuffering) { - return - } - - logger.trace('releasing buffered events...') - await preBufferTask - - isBuffering = false - - const consolidatedData = consolidateEvents(data) - if(Object.keys(consolidatedData).length) { - ev.emit('event', consolidatedData) - } - - data = makeBufferData() - - logger.trace('released buffered events') }, on: (...args) => ev.on(...args), off: (...args) => ev.off(...args),