refactor: impl counter based event buffer

1. counter based event buffer keeps track of the number of blocks that request event processing in buffer
2. event buffer only releases events when the last block completes (i.e. counter = 0)

this approach is far simpler than the promised based garbled crap I wrote, should also prevent the deadlock issues it introduced 🙏
This commit is contained in:
Adhiraj Singh
2022-12-02 11:31:42 +05:30
parent b520d81968
commit 30e2cb5c4c
5 changed files with 78 additions and 136 deletions

View File

@@ -822,7 +822,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
// we keep buffering events until we finally have // we keep buffering events until we finally have
// the key and can sync the messages // the key and can sync the messages
if(!authState.creds?.myAppStateKeyId) { if(!authState.creds?.myAppStateKeyId) {
needToFlushWithAppStateSync = ev.buffer() ev.buffer()
needToFlushWithAppStateSync = true
} }
} }
}) })

View File

@@ -645,16 +645,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
identifier: string, identifier: string,
exec: (node: BinaryNode) => Promise<any> exec: (node: BinaryNode) => Promise<any>
) => { ) => {
const started = ev.buffer() ev.buffer()
if(started) { await execTask()
await execTask() ev.flush()
if(started) {
await ev.flush()
}
} else {
const task = execTask()
ev.processInBuffer(task)
}
function execTask() { function execTask() {
return exec(node) return exec(node)
@@ -662,17 +655,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
} }
} }
// called when all offline notifs are handled
ws.on('CB:ib,,offline', async(node: BinaryNode) => {
const child = getBinaryNodeChild(node, 'offline')
const offlineNotifs = +(child?.attrs.count || 0)
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
await ev.flush()
ev.emit('connection.update', { receivedPendingNotifications: true })
})
// recv a message // recv a message
ws.on('CB:message', (node: BinaryNode) => { ws.on('CB:message', (node: BinaryNode) => {
processNodeWithBuffer(node, 'processing message', handleMessage) processNodeWithBuffer(node, 'processing message', handleMessage)

View File

@@ -532,15 +532,32 @@ export const makeSocket = ({
end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch }))
}) })
let didStartBuffer = false
process.nextTick(() => { process.nextTick(() => {
if(creds.me?.id) { if(creds.me?.id) {
// start buffering important events // start buffering important events
// if we're logged in // if we're logged in
ev.buffer() ev.buffer()
didStartBuffer = true
} }
ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined })
}) })
// called when all offline notifs are handled
ws.on('CB:ib,,offline', (node: BinaryNode) => {
const child = getBinaryNodeChild(node, 'offline')
const offlineNotifs = +(child?.attrs.count || 0)
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
if(didStartBuffer) {
ev.flush()
logger.trace('flushed events for initial buffer')
}
ev.emit('connection.update', { receivedPendingNotifications: true })
})
// update credentials when required // update credentials when required
ev.on('creds.update', update => { ev.on('creds.update', update => {
const name = update.me?.name const name = update.me?.name

View File

@@ -22,16 +22,25 @@ describe('Event Buffer Tests', () => {
ev.on('chats.update', () => fail('should not emit update event')) ev.on('chats.update', () => fail('should not emit update event'))
ev.buffer() ev.buffer()
ev.processInBuffer((async() => { await Promise.all([
await delay(100) (async() => {
ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) ev.buffer()
})()) await delay(100)
ev.processInBuffer((async() => { ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }])
await delay(200) const flushed = ev.flush()
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) expect(flushed).toBeFalsy()
})()) })(),
(async() => {
ev.buffer()
await delay(200)
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }])
const flushed = ev.flush()
expect(flushed).toBeFalsy()
})()
])
await ev.flush() const flushed = ev.flush()
expect(flushed).toBeTruthy()
expect(chats).toHaveLength(1) expect(chats).toHaveLength(1)
expect(chats[0].conversationTimestamp).toEqual(124) expect(chats[0].conversationTimestamp).toEqual(124)
@@ -51,7 +60,7 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.delete', [chatId]) ev.emit('chats.delete', [chatId])
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }])
await ev.flush() ev.flush()
expect(chats).toHaveLength(1) expect(chats).toHaveLength(1)
}) })
@@ -68,7 +77,7 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }])
ev.emit('chats.delete', [chatId]) ev.emit('chats.delete', [chatId])
await ev.flush() ev.flush()
expect(chatsDeleted).toHaveLength(1) expect(chatsDeleted).toHaveLength(1)
}) })
@@ -103,7 +112,7 @@ describe('Event Buffer Tests', () => {
} }
}]) }])
await ev.flush() ev.flush()
ev.buffer() ev.buffer()
ev.emit('chats.upsert', [{ ev.emit('chats.upsert', [{
@@ -123,7 +132,7 @@ describe('Event Buffer Tests', () => {
messages: [], messages: [],
isLatest: false isLatest: false
}) })
await ev.flush() ev.flush()
expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted).toHaveLength(1)
expect(chatsUpserted[0].id).toEqual(chatId) expect(chatsUpserted[0].id).toEqual(chatId)
@@ -159,7 +168,7 @@ describe('Event Buffer Tests', () => {
muteEndTime: 123 muteEndTime: 123
}]) }])
await ev.flush() ev.flush()
expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted).toHaveLength(1)
expect(chatsUpserted[0].archived).toBeUndefined() expect(chatsUpserted[0].archived).toBeUndefined()
@@ -184,7 +193,7 @@ describe('Event Buffer Tests', () => {
}) })
ev.emit('chats.update', [{ id: chatId, archived: true }]) ev.emit('chats.update', [{ id: chatId, archived: true }])
await ev.flush() ev.flush()
expect(chatRecv).toBeDefined() expect(chatRecv).toBeDefined()
expect(chatRecv?.archived).toBeTruthy() expect(chatRecv?.archived).toBeTruthy()
@@ -218,7 +227,7 @@ describe('Event Buffer Tests', () => {
ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' })
ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }])
await ev.flush() ev.flush()
expect(msgs).toHaveLength(1) expect(msgs).toHaveLength(1)
expect(msgs[0].message).toBeTruthy() expect(msgs[0].message).toBeTruthy()
@@ -254,7 +263,7 @@ describe('Event Buffer Tests', () => {
} }
]) ])
await ev.flush() ev.flush()
expect(msgs).toHaveLength(1) expect(msgs).toHaveLength(1)
expect(msgs[0].userReceipt).toHaveLength(1) expect(msgs[0].userReceipt).toHaveLength(1)
@@ -275,7 +284,7 @@ describe('Event Buffer Tests', () => {
ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }]) ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }])
ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }]) ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }])
await ev.flush() ev.flush()
expect(msgs).toHaveLength(1) expect(msgs).toHaveLength(1)
expect(msgs[0].update.status).toEqual(WAMessageStatus.READ) expect(msgs[0].update.status).toEqual(WAMessageStatus.READ)
@@ -303,39 +312,8 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }]) ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }])
ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }])
await ev.flush() ev.flush()
expect(chats[0].unreadCount).toBeUndefined() expect(chats[0].unreadCount).toBeUndefined()
}) })
it('should not deadlock', async() => {
const bufferedCode = ev.createBufferedFunction(
async() => {
}
)
ev.buffer()
let resolve: (() => void) | undefined
const initPromise = new Promise<void>(r => {
resolve = r
})
ev.processInBuffer(initPromise)
const flushPromise = ev.flush()
ev.processInBuffer(
(async() => {
await initPromise
await delay(100)
await bufferedCode()
})()
)
resolve!()
await flushPromise
// should resolve
await ev.flush()
})
}) })

View File

@@ -21,8 +21,6 @@ const BUFFERABLE_EVENT = [
'groups.update', 'groups.update',
] as const ] as const
const BUFFER_TIMEOUT_MS = 60_000
type BufferableEvent = typeof BUFFERABLE_EVENT[number] type BufferableEvent = typeof BUFFERABLE_EVENT[number]
/** /**
@@ -41,15 +39,16 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & {
process(handler: (events: BaileysEventData) => void | Promise<void>): (() => void) process(handler: (events: BaileysEventData) => void | Promise<void>): (() => void)
/** /**
* starts buffering events, call flush() to release them * starts buffering events, call flush() to release them
* @returns true if buffering just started, false if it was already buffering
* */ * */
buffer(): boolean buffer(): void
/** buffers all events till the promise completes */ /** buffers all events till the promise completes */
createBufferedFunction<A extends any[], T>(work: (...args: A) => Promise<T>): ((...args: A) => Promise<T>) createBufferedFunction<A extends any[], T>(work: (...args: A) => Promise<T>): ((...args: A) => Promise<T>)
/** flushes all buffered events */ /**
flush(): Promise<void> * flushes all buffered events
/** waits for the task to complete, before releasing the buffer */ * @param force if true, will flush all data regardless of any pending buffers
processInBuffer(task: Promise<any>) * @returns returns true if the flush actually happened, otherwise false
*/
flush(force?: boolean): boolean
/** is there an ongoing buffer */ /** is there an ongoing buffer */
isBuffering(): boolean isBuffering(): boolean
} }
@@ -64,14 +63,7 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
const historyCache = new Set<string>() const historyCache = new Set<string>()
let data = makeBufferData() let data = makeBufferData()
let isBuffering = false let buffersInProgress = 0
let preBufferTask: Promise<any> = Promise.resolve()
// debugging utils
let preBufferTraces: string[] = []
let bufferStartTrace: string | undefined
let bufferTimeout: NodeJS.Timeout | undefined
let waitingForPreBufferEnd = false
// take the generic event and fire it as a baileys event // take the generic event and fire it as a baileys event
ev.on('event', (map: BaileysEventData) => { ev.on('event', (map: BaileysEventData) => {
@@ -80,39 +72,25 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
} }
}) })
function startTimeout() {
bufferTimeout = setTimeout(() => {
logger.warn(
{ preBufferTraces, bufferStartTrace, waitingForPreBufferEnd },
'event buffer taking a while'
)
}, BUFFER_TIMEOUT_MS)
}
function buffer() { function buffer() {
if(!isBuffering) { buffersInProgress += 1
logger.trace('buffering events')
isBuffering = true
startTimeout()
bufferStartTrace = new Error('buffer start').stack
return true
}
return false
} }
async function flush() { function flush(force = false) {
if(!isBuffering) { // no buffer going on
return if(!buffersInProgress) {
return false
} }
logger.trace({ preBufferTraces }, 'releasing buffered events...') if(!force) {
waitingForPreBufferEnd = true // reduce the number of buffers in progress
await preBufferTask buffersInProgress -= 1
waitingForPreBufferEnd = false // if there are still some buffers going on
// then we don't flush now
preBufferTraces = [] if(buffersInProgress) {
isBuffering = false return false
}
}
const newData = makeBufferData() const newData = makeBufferData()
const chatUpdates = Object.values(data.chatUpdates) const chatUpdates = Object.values(data.chatUpdates)
@@ -133,12 +111,12 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
data = newData data = newData
clearTimeout(bufferTimeout)
logger.trace( logger.trace(
{ conditionalChatUpdatesLeft }, { conditionalChatUpdatesLeft },
'released buffered events' 'released buffered events'
) )
return true
} }
return { return {
@@ -153,40 +131,26 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
} }
}, },
emit<T extends BaileysEvent>(event: BaileysEvent, evData: BaileysEventMap[T]) { emit<T extends BaileysEvent>(event: BaileysEvent, evData: BaileysEventMap[T]) {
if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { if(buffersInProgress && BUFFERABLE_EVENT_SET.has(event)) {
append(data, historyCache, event as any, evData, logger) append(data, historyCache, event as any, evData, logger)
return true return true
} }
return ev.emit('event', { [event]: evData }) return ev.emit('event', { [event]: evData })
}, },
processInBuffer(task) {
if(isBuffering) {
// if flushing right now,
// adding this won't make a difference
if(waitingForPreBufferEnd) {
return
}
preBufferTask = Promise.allSettled([ preBufferTask, task ])
preBufferTraces.push(new Error('').stack!)
}
},
isBuffering() { isBuffering() {
return isBuffering return buffersInProgress > 0
}, },
buffer, buffer,
flush, flush,
createBufferedFunction(work) { createBufferedFunction(work) {
return async(...args) => { return async(...args) => {
const started = buffer() buffer()
try { try {
const result = await work(...args) const result = await work(...args)
return result return result
} finally { } finally {
if(started) { flush()
await flush()
}
} }
} }
}, },