From 30e2cb5c4c8ca84b1814574ed4ce74f8f8f89ecf Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Fri, 2 Dec 2022 11:31:42 +0530 Subject: [PATCH] refactor: impl counter based event buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. counter based event buffer keeps track of the number of blocks that request event processing in buffer 2. event buffer only releases events when the last block completes (i.e. counter = 0) this approach is far simpler than the promised based garbled crap I wrote, should also prevent the deadlock issues it introduced 🙏 --- src/Socket/chats.ts | 3 +- src/Socket/messages-recv.ts | 24 ++------- src/Socket/socket.ts | 17 +++++++ src/Tests/test.event-buffer.ts | 78 +++++++++++----------------- src/Utils/event-buffer.ts | 92 +++++++++++----------------------- 5 files changed, 78 insertions(+), 136 deletions(-) diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 6a2190d..64f3489 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -822,7 +822,8 @@ export const makeChatsSocket = (config: SocketConfig) => { // we keep buffering events until we finally have // the key and can sync the messages if(!authState.creds?.myAppStateKeyId) { - needToFlushWithAppStateSync = ev.buffer() + ev.buffer() + needToFlushWithAppStateSync = true } } }) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index d7118f5..75a95d1 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -645,16 +645,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { identifier: string, exec: (node: BinaryNode) => Promise ) => { - const started = ev.buffer() - if(started) { - await execTask() - if(started) { - await ev.flush() - } - } else { - const task = execTask() - ev.processInBuffer(task) - } + ev.buffer() + await execTask() + ev.flush() function execTask() { return exec(node) @@ -662,17 +655,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - // called when all offline notifs are handled - ws.on('CB:ib,,offline', async(node: BinaryNode) => { - const child = getBinaryNodeChild(node, 'offline') - const offlineNotifs = +(child?.attrs.count || 0) - - logger.info(`handled ${offlineNotifs} offline messages/notifications`) - await ev.flush() - - ev.emit('connection.update', { receivedPendingNotifications: true }) - }) - // recv a message ws.on('CB:message', (node: BinaryNode) => { processNodeWithBuffer(node, 'processing message', handleMessage) diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 69c3581..fbbb6e3 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -532,15 +532,32 @@ export const makeSocket = ({ end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) }) + let didStartBuffer = false process.nextTick(() => { if(creds.me?.id) { // start buffering important events // if we're logged in ev.buffer() + didStartBuffer = true } ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) }) + + // called when all offline notifs are handled + ws.on('CB:ib,,offline', (node: BinaryNode) => { + const child = getBinaryNodeChild(node, 'offline') + const offlineNotifs = +(child?.attrs.count || 0) + + logger.info(`handled ${offlineNotifs} offline messages/notifications`) + if(didStartBuffer) { + ev.flush() + logger.trace('flushed events for initial buffer') + } + + ev.emit('connection.update', { receivedPendingNotifications: true }) + }) + // update credentials when required ev.on('creds.update', update => { const name = update.me?.name diff --git a/src/Tests/test.event-buffer.ts b/src/Tests/test.event-buffer.ts index 18a02ed..e2453e1 100644 --- a/src/Tests/test.event-buffer.ts +++ b/src/Tests/test.event-buffer.ts @@ -22,16 +22,25 @@ describe('Event Buffer Tests', () => { ev.on('chats.update', () => fail('should not emit update event')) ev.buffer() - ev.processInBuffer((async() => { - await delay(100) - ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) - })()) - ev.processInBuffer((async() => { - await delay(200) - ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - })()) + await Promise.all([ + (async() => { + ev.buffer() + await delay(100) + ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })(), + (async() => { + ev.buffer() + await delay(200) + ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })() + ]) - await ev.flush() + const flushed = ev.flush() + expect(flushed).toBeTruthy() expect(chats).toHaveLength(1) expect(chats[0].conversationTimestamp).toEqual(124) @@ -51,7 +60,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.delete', [chatId]) ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - await ev.flush() + ev.flush() expect(chats).toHaveLength(1) }) @@ -68,7 +77,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) ev.emit('chats.delete', [chatId]) - await ev.flush() + ev.flush() expect(chatsDeleted).toHaveLength(1) }) @@ -103,7 +112,7 @@ describe('Event Buffer Tests', () => { } }]) - await ev.flush() + ev.flush() ev.buffer() ev.emit('chats.upsert', [{ @@ -123,7 +132,7 @@ describe('Event Buffer Tests', () => { messages: [], isLatest: false }) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].id).toEqual(chatId) @@ -159,7 +168,7 @@ describe('Event Buffer Tests', () => { muteEndTime: 123 }]) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].archived).toBeUndefined() @@ -184,7 +193,7 @@ describe('Event Buffer Tests', () => { }) ev.emit('chats.update', [{ id: chatId, archived: true }]) - await ev.flush() + ev.flush() expect(chatRecv).toBeDefined() expect(chatRecv?.archived).toBeTruthy() @@ -218,7 +227,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].message).toBeTruthy() @@ -254,7 +263,7 @@ describe('Event Buffer Tests', () => { } ]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].userReceipt).toHaveLength(1) @@ -275,7 +284,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }]) ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].update.status).toEqual(WAMessageStatus.READ) @@ -303,39 +312,8 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }]) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(chats[0].unreadCount).toBeUndefined() }) - - it('should not deadlock', async() => { - const bufferedCode = ev.createBufferedFunction( - async() => { - - } - ) - ev.buffer() - - let resolve: (() => void) | undefined - const initPromise = new Promise(r => { - resolve = r - }) - ev.processInBuffer(initPromise) - const flushPromise = ev.flush() - - ev.processInBuffer( - (async() => { - await initPromise - await delay(100) - await bufferedCode() - })() - ) - - resolve!() - - await flushPromise - - // should resolve - await ev.flush() - }) }) \ No newline at end of file diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index a6e6176..63b349d 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -21,8 +21,6 @@ const BUFFERABLE_EVENT = [ 'groups.update', ] as const -const BUFFER_TIMEOUT_MS = 60_000 - type BufferableEvent = typeof BUFFERABLE_EVENT[number] /** @@ -41,15 +39,16 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & { process(handler: (events: BaileysEventData) => void | Promise): (() => void) /** * starts buffering events, call flush() to release them - * @returns true if buffering just started, false if it was already buffering * */ - buffer(): boolean + buffer(): void /** buffers all events till the promise completes */ createBufferedFunction(work: (...args: A) => Promise): ((...args: A) => Promise) - /** flushes all buffered events */ - flush(): Promise - /** waits for the task to complete, before releasing the buffer */ - processInBuffer(task: Promise) + /** + * flushes all buffered events + * @param force if true, will flush all data regardless of any pending buffers + * @returns returns true if the flush actually happened, otherwise false + */ + flush(force?: boolean): boolean /** is there an ongoing buffer */ isBuffering(): boolean } @@ -64,14 +63,7 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = const historyCache = new Set() let data = makeBufferData() - let isBuffering = false - let preBufferTask: Promise = Promise.resolve() - - // debugging utils - let preBufferTraces: string[] = [] - let bufferStartTrace: string | undefined - let bufferTimeout: NodeJS.Timeout | undefined - let waitingForPreBufferEnd = false + let buffersInProgress = 0 // take the generic event and fire it as a baileys event ev.on('event', (map: BaileysEventData) => { @@ -80,39 +72,25 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }) - function startTimeout() { - bufferTimeout = setTimeout(() => { - logger.warn( - { preBufferTraces, bufferStartTrace, waitingForPreBufferEnd }, - 'event buffer taking a while' - ) - }, BUFFER_TIMEOUT_MS) - } - function buffer() { - if(!isBuffering) { - logger.trace('buffering events') - isBuffering = true - startTimeout() - bufferStartTrace = new Error('buffer start').stack - return true - } - - return false + buffersInProgress += 1 } - async function flush() { - if(!isBuffering) { - return + function flush(force = false) { + // no buffer going on + if(!buffersInProgress) { + return false } - logger.trace({ preBufferTraces }, 'releasing buffered events...') - waitingForPreBufferEnd = true - await preBufferTask - waitingForPreBufferEnd = false - - preBufferTraces = [] - isBuffering = false + if(!force) { + // reduce the number of buffers in progress + buffersInProgress -= 1 + // if there are still some buffers going on + // then we don't flush now + if(buffersInProgress) { + return false + } + } const newData = makeBufferData() const chatUpdates = Object.values(data.chatUpdates) @@ -133,12 +111,12 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = data = newData - clearTimeout(bufferTimeout) - logger.trace( { conditionalChatUpdatesLeft }, 'released buffered events' ) + + return true } return { @@ -153,40 +131,26 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }, emit(event: BaileysEvent, evData: BaileysEventMap[T]) { - if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { + if(buffersInProgress && BUFFERABLE_EVENT_SET.has(event)) { append(data, historyCache, event as any, evData, logger) return true } return ev.emit('event', { [event]: evData }) }, - processInBuffer(task) { - if(isBuffering) { - // if flushing right now, - // adding this won't make a difference - if(waitingForPreBufferEnd) { - return - } - - preBufferTask = Promise.allSettled([ preBufferTask, task ]) - preBufferTraces.push(new Error('').stack!) - } - }, isBuffering() { - return isBuffering + return buffersInProgress > 0 }, buffer, flush, createBufferedFunction(work) { return async(...args) => { - const started = buffer() + buffer() try { const result = await work(...args) return result } finally { - if(started) { - await flush() - } + flush() } } },