diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 6a2190d..64f3489 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -822,7 +822,8 @@ export const makeChatsSocket = (config: SocketConfig) => { // we keep buffering events until we finally have // the key and can sync the messages if(!authState.creds?.myAppStateKeyId) { - needToFlushWithAppStateSync = ev.buffer() + ev.buffer() + needToFlushWithAppStateSync = true } } }) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index d7118f5..75a95d1 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -645,16 +645,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { identifier: string, exec: (node: BinaryNode) => Promise ) => { - const started = ev.buffer() - if(started) { - await execTask() - if(started) { - await ev.flush() - } - } else { - const task = execTask() - ev.processInBuffer(task) - } + ev.buffer() + await execTask() + ev.flush() function execTask() { return exec(node) @@ -662,17 +655,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - // called when all offline notifs are handled - ws.on('CB:ib,,offline', async(node: BinaryNode) => { - const child = getBinaryNodeChild(node, 'offline') - const offlineNotifs = +(child?.attrs.count || 0) - - logger.info(`handled ${offlineNotifs} offline messages/notifications`) - await ev.flush() - - ev.emit('connection.update', { receivedPendingNotifications: true }) - }) - // recv a message ws.on('CB:message', (node: BinaryNode) => { processNodeWithBuffer(node, 'processing message', handleMessage) diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 69c3581..fbbb6e3 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -532,15 +532,32 @@ export const makeSocket = ({ end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) }) + let didStartBuffer = false process.nextTick(() => { if(creds.me?.id) { // start buffering important events // if we're logged in ev.buffer() + didStartBuffer = true } ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) }) + + // called when all offline notifs are handled + ws.on('CB:ib,,offline', (node: BinaryNode) => { + const child = getBinaryNodeChild(node, 'offline') + const offlineNotifs = +(child?.attrs.count || 0) + + logger.info(`handled ${offlineNotifs} offline messages/notifications`) + if(didStartBuffer) { + ev.flush() + logger.trace('flushed events for initial buffer') + } + + ev.emit('connection.update', { receivedPendingNotifications: true }) + }) + // update credentials when required ev.on('creds.update', update => { const name = update.me?.name diff --git a/src/Tests/test.event-buffer.ts b/src/Tests/test.event-buffer.ts index 18a02ed..e2453e1 100644 --- a/src/Tests/test.event-buffer.ts +++ b/src/Tests/test.event-buffer.ts @@ -22,16 +22,25 @@ describe('Event Buffer Tests', () => { ev.on('chats.update', () => fail('should not emit update event')) ev.buffer() - ev.processInBuffer((async() => { - await delay(100) - ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) - })()) - ev.processInBuffer((async() => { - await delay(200) - ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - })()) + await Promise.all([ + (async() => { + ev.buffer() + await delay(100) + ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })(), + (async() => { + ev.buffer() + await delay(200) + ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })() + ]) - await ev.flush() + const flushed = ev.flush() + expect(flushed).toBeTruthy() expect(chats).toHaveLength(1) expect(chats[0].conversationTimestamp).toEqual(124) @@ -51,7 +60,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.delete', [chatId]) ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - await ev.flush() + ev.flush() expect(chats).toHaveLength(1) }) @@ -68,7 +77,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) ev.emit('chats.delete', [chatId]) - await ev.flush() + ev.flush() expect(chatsDeleted).toHaveLength(1) }) @@ -103,7 +112,7 @@ describe('Event Buffer Tests', () => { } }]) - await ev.flush() + ev.flush() ev.buffer() ev.emit('chats.upsert', [{ @@ -123,7 +132,7 @@ describe('Event Buffer Tests', () => { messages: [], isLatest: false }) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].id).toEqual(chatId) @@ -159,7 +168,7 @@ describe('Event Buffer Tests', () => { muteEndTime: 123 }]) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].archived).toBeUndefined() @@ -184,7 +193,7 @@ describe('Event Buffer Tests', () => { }) ev.emit('chats.update', [{ id: chatId, archived: true }]) - await ev.flush() + ev.flush() expect(chatRecv).toBeDefined() expect(chatRecv?.archived).toBeTruthy() @@ -218,7 +227,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].message).toBeTruthy() @@ -254,7 +263,7 @@ describe('Event Buffer Tests', () => { } ]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].userReceipt).toHaveLength(1) @@ -275,7 +284,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }]) ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].update.status).toEqual(WAMessageStatus.READ) @@ -303,39 +312,8 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }]) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(chats[0].unreadCount).toBeUndefined() }) - - it('should not deadlock', async() => { - const bufferedCode = ev.createBufferedFunction( - async() => { - - } - ) - ev.buffer() - - let resolve: (() => void) | undefined - const initPromise = new Promise(r => { - resolve = r - }) - ev.processInBuffer(initPromise) - const flushPromise = ev.flush() - - ev.processInBuffer( - (async() => { - await initPromise - await delay(100) - await bufferedCode() - })() - ) - - resolve!() - - await flushPromise - - // should resolve - await ev.flush() - }) }) \ No newline at end of file diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index a6e6176..63b349d 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -21,8 +21,6 @@ const BUFFERABLE_EVENT = [ 'groups.update', ] as const -const BUFFER_TIMEOUT_MS = 60_000 - type BufferableEvent = typeof BUFFERABLE_EVENT[number] /** @@ -41,15 +39,16 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & { process(handler: (events: BaileysEventData) => void | Promise): (() => void) /** * starts buffering events, call flush() to release them - * @returns true if buffering just started, false if it was already buffering * */ - buffer(): boolean + buffer(): void /** buffers all events till the promise completes */ createBufferedFunction(work: (...args: A) => Promise): ((...args: A) => Promise) - /** flushes all buffered events */ - flush(): Promise - /** waits for the task to complete, before releasing the buffer */ - processInBuffer(task: Promise) + /** + * flushes all buffered events + * @param force if true, will flush all data regardless of any pending buffers + * @returns returns true if the flush actually happened, otherwise false + */ + flush(force?: boolean): boolean /** is there an ongoing buffer */ isBuffering(): boolean } @@ -64,14 +63,7 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = const historyCache = new Set() let data = makeBufferData() - let isBuffering = false - let preBufferTask: Promise = Promise.resolve() - - // debugging utils - let preBufferTraces: string[] = [] - let bufferStartTrace: string | undefined - let bufferTimeout: NodeJS.Timeout | undefined - let waitingForPreBufferEnd = false + let buffersInProgress = 0 // take the generic event and fire it as a baileys event ev.on('event', (map: BaileysEventData) => { @@ -80,39 +72,25 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }) - function startTimeout() { - bufferTimeout = setTimeout(() => { - logger.warn( - { preBufferTraces, bufferStartTrace, waitingForPreBufferEnd }, - 'event buffer taking a while' - ) - }, BUFFER_TIMEOUT_MS) - } - function buffer() { - if(!isBuffering) { - logger.trace('buffering events') - isBuffering = true - startTimeout() - bufferStartTrace = new Error('buffer start').stack - return true - } - - return false + buffersInProgress += 1 } - async function flush() { - if(!isBuffering) { - return + function flush(force = false) { + // no buffer going on + if(!buffersInProgress) { + return false } - logger.trace({ preBufferTraces }, 'releasing buffered events...') - waitingForPreBufferEnd = true - await preBufferTask - waitingForPreBufferEnd = false - - preBufferTraces = [] - isBuffering = false + if(!force) { + // reduce the number of buffers in progress + buffersInProgress -= 1 + // if there are still some buffers going on + // then we don't flush now + if(buffersInProgress) { + return false + } + } const newData = makeBufferData() const chatUpdates = Object.values(data.chatUpdates) @@ -133,12 +111,12 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = data = newData - clearTimeout(bufferTimeout) - logger.trace( { conditionalChatUpdatesLeft }, 'released buffered events' ) + + return true } return { @@ -153,40 +131,26 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }, emit(event: BaileysEvent, evData: BaileysEventMap[T]) { - if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { + if(buffersInProgress && BUFFERABLE_EVENT_SET.has(event)) { append(data, historyCache, event as any, evData, logger) return true } return ev.emit('event', { [event]: evData }) }, - processInBuffer(task) { - if(isBuffering) { - // if flushing right now, - // adding this won't make a difference - if(waitingForPreBufferEnd) { - return - } - - preBufferTask = Promise.allSettled([ preBufferTask, task ]) - preBufferTraces.push(new Error('').stack!) - } - }, isBuffering() { - return isBuffering + return buffersInProgress > 0 }, buffer, flush, createBufferedFunction(work) { return async(...args) => { - const started = buffer() + buffer() try { const result = await work(...args) return result } finally { - if(started) { - await flush() - } + flush() } } },