fix: handle when buffered tasks fail

This commit is contained in:
Adhiraj Singh
2022-08-18 10:15:27 +05:30
parent dc07d31dc7
commit 8d6e00eab6
3 changed files with 51 additions and 48 deletions

View File

@@ -313,8 +313,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
} }
const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => {
const startedBuffer = ev.buffer()
const { onMutation } = newAppStateChunkHandler(recvChats) const { onMutation } = newAppStateChunkHandler(recvChats)
// we use this to determine which events to fire // we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire // otherwise when we resync from scratch -- all notifications will fire
@@ -426,12 +425,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
} }
) )
})
// flush everything if we started the buffer here
if(startedBuffer) {
await ev.flush()
}
}
/** /**
* fetch the profile picture of a user/group * fetch the profile picture of a user/group
@@ -700,8 +694,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
]) ])
} }
const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => { const upsertMessage = ev.createBufferedFunction(async(msg: WAMessage, type: MessageUpsertType) => {
const startedBuffer = ev.buffer()
ev.emit('messages.upsert', { messages: [msg], type }) ev.emit('messages.upsert', { messages: [msg], type })
if(!!msg.pushName) { if(!!msg.pushName) {
@@ -739,11 +732,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
logger.debug('restarting app sync timeout') logger.debug('restarting app sync timeout')
appStateSyncTimeout.start() appStateSyncTimeout.start()
} }
})
if(startedBuffer) {
await ev.flush()
}
}
ws.on('CB:presence', handlePresenceUpdate) ws.on('CB:presence', handlePresenceUpdate)
ws.on('CB:chatstate', handlePresenceUpdate) ws.on('CB:chatstate', handlePresenceUpdate)

View File

@@ -147,7 +147,7 @@ export const makeGroupsSocket = (config: SocketConfig) => {
* @param key the key of the invite message, or optionally only provide the jid of the person who sent the invite * @param key the key of the invite message, or optionally only provide the jid of the person who sent the invite
* @param inviteMessage the message to accept * @param inviteMessage the message to accept
*/ */
groupAcceptInviteV4: async(key: string | WAMessageKey, inviteMessage: proto.Message.IGroupInviteMessage) => { groupAcceptInviteV4: ev.createBufferedFunction(async(key: string | WAMessageKey, inviteMessage: proto.Message.IGroupInviteMessage) => {
key = typeof key === 'string' ? { remoteJid: key } : key key = typeof key === 'string' ? { remoteJid: key } : key
const results = await groupQuery(inviteMessage.groupJid!, 'set', [{ const results = await groupQuery(inviteMessage.groupJid!, 'set', [{
tag: 'accept', tag: 'accept',
@@ -158,7 +158,6 @@ export const makeGroupsSocket = (config: SocketConfig) => {
} }
}]) }])
const started = ev.buffer()
// if we have the full message key // if we have the full message key
// update the invite message to be expired // update the invite message to be expired
if(key.id) { if(key.id) {
@@ -197,12 +196,8 @@ export const makeGroupsSocket = (config: SocketConfig) => {
'notify' 'notify'
) )
if(started) {
await ev.flush()
}
return results.attrs.from return results.attrs.from
}, }),
groupGetInviteInfo: async(code: string) => { groupGetInviteInfo: async(code: string) => {
const results = await groupQuery('@g.us', 'get', [{ tag: 'invite', attrs: { code } }]) const results = await groupQuery('@g.us', 'get', [{ tag: 'invite', attrs: { code } }])
return extractGroupMetadata(results) return extractGroupMetadata(results)

View File

@@ -40,6 +40,8 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & {
* @returns true if buffering just started, false if it was already buffering * @returns true if buffering just started, false if it was already buffering
* */ * */
buffer(): boolean buffer(): boolean
/** buffers all events till the promise completes */
createBufferedFunction<A extends any[], T>(work: (...args: A) => Promise<T>): ((...args: A) => Promise<T>)
/** flushes all buffered events */ /** flushes all buffered events */
flush(): Promise<void> flush(): Promise<void>
/** waits for the task to complete, before releasing the buffer */ /** waits for the task to complete, before releasing the buffer */
@@ -66,6 +68,36 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
} }
}) })
function buffer() {
if(!isBuffering) {
logger.trace('buffering events')
isBuffering = true
return true
}
return false
}
async function flush() {
if(!isBuffering) {
return
}
logger.trace('releasing buffered events...')
await preBufferTask
isBuffering = false
const consolidatedData = consolidateEvents(data)
if(Object.keys(consolidatedData).length) {
ev.emit('event', consolidatedData)
}
data = makeBufferData()
logger.trace('released buffered events')
}
return { return {
process(handler) { process(handler) {
const listener = (map: BaileysEventData) => { const listener = (map: BaileysEventData) => {
@@ -90,33 +122,20 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
preBufferTask = Promise.allSettled([ preBufferTask, task ]) preBufferTask = Promise.allSettled([ preBufferTask, task ])
} }
}, },
buffer() { buffer,
if(!isBuffering) { flush,
logger.trace('buffering events') createBufferedFunction(work) {
isBuffering = true return async(...args) => {
return true const started = buffer()
try {
const result = await work(...args)
return result
} finally {
if(started) {
await flush()
}
}
} }
return false
},
async flush() {
if(!isBuffering) {
return
}
logger.trace('releasing buffered events...')
await preBufferTask
isBuffering = false
const consolidatedData = consolidateEvents(data)
if(Object.keys(consolidatedData).length) {
ev.emit('event', consolidatedData)
}
data = makeBufferData()
logger.trace('released buffered events')
}, },
on: (...args) => ev.on(...args), on: (...args) => ev.on(...args),
off: (...args) => ev.off(...args), off: (...args) => ev.off(...args),