From 05dd53e2ce2e7d648026dbc8540a3057d73707b9 Mon Sep 17 00:00:00 2001 From: Alan Mosko Date: Tue, 17 Jan 2023 11:29:10 -0300 Subject: [PATCH] wip --- proto-extract/index.js | 2 +- src/Defaults/baileys-version.json | 2 +- src/Socket/business.ts | 51 ++++++++++++++---------- src/Socket/chats.ts | 24 +++++++++--- src/Socket/messages-recv.ts | 39 +++++++++---------- src/Socket/socket.ts | 25 +++++++++++- src/Tests/test.event-buffer.ts | 47 ++++++++++++++--------- src/Tests/test.messages.ts | 37 ++++++++++++++++++ src/Types/Product.ts | 11 ++++++ src/Utils/business.ts | 9 ++++- src/Utils/event-buffer.ts | 64 ++++++++++++++----------------- src/Utils/make-mutex.ts | 12 ++++++ 12 files changed, 217 insertions(+), 106 deletions(-) create mode 100644 src/Tests/test.messages.ts diff --git a/proto-extract/index.js b/proto-extract/index.js index a3a6365..02dc143 100644 --- a/proto-extract/index.js +++ b/proto-extract/index.js @@ -285,7 +285,7 @@ async function findAppModules() { const indentation = moduleIndentationMap[info.type]?.indentation let typeName = unnestName(info.type) if(indentation !== parentName && indentation) { - typeName = `${indentation.replace(/\$/g, '.')}.${typeName}` + typeName = `${indentation.replaceAll('$', '.')}.${typeName}` } // if(info.enumValues) { diff --git a/src/Defaults/baileys-version.json b/src/Defaults/baileys-version.json index 5697301..71f7efc 100644 --- a/src/Defaults/baileys-version.json +++ b/src/Defaults/baileys-version.json @@ -1,3 +1,3 @@ { - "version": [2, 2244, 6] + "version": [2, 2243, 7] } \ No newline at end of file diff --git a/src/Socket/business.ts b/src/Socket/business.ts index 114d4f3..457701e 100644 --- a/src/Socket/business.ts +++ b/src/Socket/business.ts @@ -1,6 +1,6 @@ -import { ProductCreate, ProductUpdate, SocketConfig } from '../Types' +import { GetCatalogOptions, ProductCreate, ProductUpdate, SocketConfig } from '../Types' import { parseCatalogNode, parseCollectionsNode, parseOrderDetailsNode, parseProductNode, toProductNode, uploadingNecessaryImagesOfProduct } from '../Utils/business' -import { jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary' +import { BinaryNode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary' import { getBinaryNodeChild } from '../WABinary/generic-utils' import { makeMessagesRecvSocket } from './messages-recv' @@ -12,9 +12,36 @@ export const makeBusinessSocket = (config: SocketConfig) => { waUploadToServer } = sock - const getCatalog = async(jid?: string, limit = 10) => { + const getCatalog = async({ jid, limit, cursor }: GetCatalogOptions) => { jid = jid || authState.creds.me?.id jid = jidNormalizedUser(jid!) + + const queryParamNodes: BinaryNode[] = [ + { + tag: 'limit', + attrs: { }, + content: Buffer.from((limit || 10).toString()) + }, + { + tag: 'width', + attrs: { }, + content: Buffer.from('100') + }, + { + tag: 'height', + attrs: { }, + content: Buffer.from('100') + }, + ] + + if(cursor) { + queryParamNodes.push({ + tag: 'after', + attrs: { }, + content: cursor + }) + } + const result = await query({ tag: 'iq', attrs: { @@ -29,23 +56,7 @@ export const makeBusinessSocket = (config: SocketConfig) => { jid, allow_shop_source: 'true' }, - content: [ - { - tag: 'limit', - attrs: { }, - content: Buffer.from(limit.toString()) - }, - { - tag: 'width', - attrs: { }, - content: Buffer.from('100') - }, - { - tag: 'height', - attrs: { }, - content: Buffer.from('100') - } - ] + content: queryParamNodes } ] }) diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 6a2190d..ee34528 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -494,14 +494,27 @@ export const makeChatsSocket = (config: SocketConfig) => { } } - const presenceSubscribe = (toJid: string) => ( + /** + * @param toJid the jid to subscribe to + * @param tcToken token for subscription, use if present + */ + const presenceSubscribe = (toJid: string, tcToken?: Buffer) => ( sendNode({ tag: 'presence', attrs: { to: toJid, id: generateMessageTag(), type: 'subscribe' - } + }, + content: tcToken + ? [ + { + tag: 'tctoken', + attrs: { }, + content: tcToken + } + ] + : undefined }) ) @@ -725,7 +738,7 @@ export const makeChatsSocket = (config: SocketConfig) => { ) : false - if(shouldProcessHistoryMsg && !authState.creds.myAppStateKeyId) { + if(historyMsg && !authState.creds.myAppStateKeyId) { logger.warn('skipping app state sync, as myAppStateKeyId is not set') pendingAppStateSync = true } @@ -733,7 +746,7 @@ export const makeChatsSocket = (config: SocketConfig) => { await Promise.all([ (async() => { if( - shouldProcessHistoryMsg + historyMsg && authState.creds.myAppStateKeyId ) { pendingAppStateSync = false @@ -822,7 +835,8 @@ export const makeChatsSocket = (config: SocketConfig) => { // we keep buffering events until we finally have // the key and can sync the messages if(!authState.creds?.myAppStateKeyId) { - needToFlushWithAppStateSync = ev.buffer() + ev.buffer() + needToFlushWithAppStateSync = true } } }) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index d7118f5..7310b4d 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -268,6 +268,21 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const from = jidNormalizedUser(node.attrs.from) switch (nodeType) { + case 'privacy_token': + const tokenList = getBinaryNodeChildren(child, 'token') + for(const { attrs, content } of tokenList) { + const jid = attrs.jid + ev.emit('chats.update', [ + { + id: jid, + tcToken: content as Buffer + } + ]) + + logger.debug({ jid }, 'got privacy token update') + } + + break case 'w:gp2': handleGroupNotification(node.attrs.participant, child, result) break @@ -645,16 +660,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { identifier: string, exec: (node: BinaryNode) => Promise ) => { - const started = ev.buffer() - if(started) { - await execTask() - if(started) { - await ev.flush() - } - } else { - const task = execTask() - ev.processInBuffer(task) - } + ev.buffer() + await execTask() + ev.flush() function execTask() { return exec(node) @@ -662,17 +670,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - // called when all offline notifs are handled - ws.on('CB:ib,,offline', async(node: BinaryNode) => { - const child = getBinaryNodeChild(node, 'offline') - const offlineNotifs = +(child?.attrs.count || 0) - - logger.info(`handled ${offlineNotifs} offline messages/notifications`) - await ev.flush() - - ev.emit('connection.update', { receivedPendingNotifications: true }) - }) - // recv a message ws.on('CB:message', (node: BinaryNode) => { processNodeWithBuffer(node, 'processing message', handleMessage) diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 34b546a..fbbb6e3 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -532,11 +532,32 @@ export const makeSocket = ({ end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) }) + let didStartBuffer = false process.nextTick(() => { - // start buffering important events - ev.buffer() + if(creds.me?.id) { + // start buffering important events + // if we're logged in + ev.buffer() + didStartBuffer = true + } + ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) }) + + // called when all offline notifs are handled + ws.on('CB:ib,,offline', (node: BinaryNode) => { + const child = getBinaryNodeChild(node, 'offline') + const offlineNotifs = +(child?.attrs.count || 0) + + logger.info(`handled ${offlineNotifs} offline messages/notifications`) + if(didStartBuffer) { + ev.flush() + logger.trace('flushed events for initial buffer') + } + + ev.emit('connection.update', { receivedPendingNotifications: true }) + }) + // update credentials when required ev.on('creds.update', update => { const name = update.me?.name diff --git a/src/Tests/test.event-buffer.ts b/src/Tests/test.event-buffer.ts index 71c98f7..e2453e1 100644 --- a/src/Tests/test.event-buffer.ts +++ b/src/Tests/test.event-buffer.ts @@ -22,16 +22,25 @@ describe('Event Buffer Tests', () => { ev.on('chats.update', () => fail('should not emit update event')) ev.buffer() - ev.processInBuffer((async() => { - await delay(100) - ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) - })()) - ev.processInBuffer((async() => { - await delay(200) - ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - })()) + await Promise.all([ + (async() => { + ev.buffer() + await delay(100) + ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })(), + (async() => { + ev.buffer() + await delay(200) + ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) + const flushed = ev.flush() + expect(flushed).toBeFalsy() + })() + ]) - await ev.flush() + const flushed = ev.flush() + expect(flushed).toBeTruthy() expect(chats).toHaveLength(1) expect(chats[0].conversationTimestamp).toEqual(124) @@ -51,7 +60,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.delete', [chatId]) ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) - await ev.flush() + ev.flush() expect(chats).toHaveLength(1) }) @@ -68,7 +77,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) ev.emit('chats.delete', [chatId]) - await ev.flush() + ev.flush() expect(chatsDeleted).toHaveLength(1) }) @@ -103,7 +112,7 @@ describe('Event Buffer Tests', () => { } }]) - await ev.flush() + ev.flush() ev.buffer() ev.emit('chats.upsert', [{ @@ -123,7 +132,7 @@ describe('Event Buffer Tests', () => { messages: [], isLatest: false }) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].id).toEqual(chatId) @@ -159,7 +168,7 @@ describe('Event Buffer Tests', () => { muteEndTime: 123 }]) - await ev.flush() + ev.flush() expect(chatsUpserted).toHaveLength(1) expect(chatsUpserted[0].archived).toBeUndefined() @@ -184,7 +193,7 @@ describe('Event Buffer Tests', () => { }) ev.emit('chats.update', [{ id: chatId, archived: true }]) - await ev.flush() + ev.flush() expect(chatRecv).toBeDefined() expect(chatRecv?.archived).toBeTruthy() @@ -218,7 +227,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].message).toBeTruthy() @@ -254,7 +263,7 @@ describe('Event Buffer Tests', () => { } ]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].userReceipt).toHaveLength(1) @@ -275,7 +284,7 @@ describe('Event Buffer Tests', () => { ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }]) ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(msgs).toHaveLength(1) expect(msgs[0].update.status).toEqual(WAMessageStatus.READ) @@ -303,7 +312,7 @@ describe('Event Buffer Tests', () => { ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }]) ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) - await ev.flush() + ev.flush() expect(chats[0].unreadCount).toBeUndefined() }) diff --git a/src/Tests/test.messages.ts b/src/Tests/test.messages.ts new file mode 100644 index 0000000..7f51f39 --- /dev/null +++ b/src/Tests/test.messages.ts @@ -0,0 +1,37 @@ +import { WAMessageContent } from '../Types' +import { normalizeMessageContent } from '../Utils' + +describe('Messages Tests', () => { + + it('should correctly unwrap messages', () => { + const CONTENT = { imageMessage: { } } + expectRightContent(CONTENT) + expectRightContent({ + ephemeralMessage: { message: CONTENT } + }) + expectRightContent({ + viewOnceMessage: { + message: { + ephemeralMessage: { message: CONTENT } + } + } + }) + expectRightContent({ + viewOnceMessage: { + message: { + viewOnceMessageV2: { + message: { + ephemeralMessage: { message: CONTENT } + } + } + } + } + }) + + function expectRightContent(content: WAMessageContent) { + expect( + normalizeMessageContent(content) + ).toHaveProperty('imageMessage') + } + }) +}) \ No newline at end of file diff --git a/src/Types/Product.ts b/src/Types/Product.ts index ca4d5d7..4fcb0fc 100644 --- a/src/Types/Product.ts +++ b/src/Types/Product.ts @@ -70,4 +70,15 @@ export type OrderProduct = { export type OrderDetails = { price: OrderPrice products: OrderProduct[] +} + +export type CatalogCursor = string + +export type GetCatalogOptions = { + /** cursor to start from */ + cursor?: CatalogCursor + /** number of products to fetch */ + limit?: number + + jid?: string } \ No newline at end of file diff --git a/src/Utils/business.ts b/src/Utils/business.ts index f49470e..b2fd3f4 100644 --- a/src/Utils/business.ts +++ b/src/Utils/business.ts @@ -7,7 +7,14 @@ import { getStream, getUrlFromDirectPath, toReadable } from './messages-media' export const parseCatalogNode = (node: BinaryNode) => { const catalogNode = getBinaryNodeChild(node, 'product_catalog') const products = getBinaryNodeChildren(catalogNode, 'product').map(parseProductNode) - return { products } + const paging = getBinaryNodeChild(catalogNode, 'paging') + + return { + products, + nextPageCursor: paging + ? getBinaryNodeChildString(paging, 'after') + : undefined + } } export const parseCollectionsNode = (node: BinaryNode) => { diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index f924e9e..63b349d 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -39,15 +39,16 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & { process(handler: (events: BaileysEventData) => void | Promise): (() => void) /** * starts buffering events, call flush() to release them - * @returns true if buffering just started, false if it was already buffering * */ - buffer(): boolean + buffer(): void /** buffers all events till the promise completes */ createBufferedFunction(work: (...args: A) => Promise): ((...args: A) => Promise) - /** flushes all buffered events */ - flush(): Promise - /** waits for the task to complete, before releasing the buffer */ - processInBuffer(task: Promise) + /** + * flushes all buffered events + * @param force if true, will flush all data regardless of any pending buffers + * @returns returns true if the flush actually happened, otherwise false + */ + flush(force?: boolean): boolean /** is there an ongoing buffer */ isBuffering(): boolean } @@ -62,9 +63,7 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = const historyCache = new Set() let data = makeBufferData() - let isBuffering = false - let preBufferTask: Promise = Promise.resolve() - let preBufferTraces: string[] = [] + let buffersInProgress = 0 // take the generic event and fire it as a baileys event ev.on('event', (map: BaileysEventData) => { @@ -74,25 +73,24 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = }) function buffer() { - if(!isBuffering) { - logger.trace('buffering events') - isBuffering = true - return true - } - - return false + buffersInProgress += 1 } - async function flush() { - if(!isBuffering) { - return + function flush(force = false) { + // no buffer going on + if(!buffersInProgress) { + return false } - logger.trace({ preBufferTraces }, 'releasing buffered events...') - await preBufferTask - - preBufferTraces = [] - isBuffering = false + if(!force) { + // reduce the number of buffers in progress + buffersInProgress -= 1 + // if there are still some buffers going on + // then we don't flush now + if(buffersInProgress) { + return false + } + } const newData = makeBufferData() const chatUpdates = Object.values(data.chatUpdates) @@ -117,6 +115,8 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = { conditionalChatUpdatesLeft }, 'released buffered events' ) + + return true } return { @@ -131,34 +131,26 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter = } }, emit(event: BaileysEvent, evData: BaileysEventMap[T]) { - if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { + if(buffersInProgress && BUFFERABLE_EVENT_SET.has(event)) { append(data, historyCache, event as any, evData, logger) return true } return ev.emit('event', { [event]: evData }) }, - processInBuffer(task) { - if(isBuffering) { - preBufferTask = Promise.allSettled([ preBufferTask, task ]) - preBufferTraces.push(new Error('').stack!) - } - }, isBuffering() { - return isBuffering + return buffersInProgress > 0 }, buffer, flush, createBufferedFunction(work) { return async(...args) => { - const started = buffer() + buffer() try { const result = await work(...args) return result } finally { - if(started) { - await flush() - } + flush() } } }, diff --git a/src/Utils/make-mutex.ts b/src/Utils/make-mutex.ts index b02878f..a632caa 100644 --- a/src/Utils/make-mutex.ts +++ b/src/Utils/make-mutex.ts @@ -1,8 +1,20 @@ +import logger from './logger' + +const MUTEX_TIMEOUT_MS = 60_000 + export const makeMutex = () => { let task = Promise.resolve() as Promise + + let taskTimeout: NodeJS.Timeout | undefined + return { mutex(code: () => Promise | T): Promise { task = (async() => { + const stack = new Error('mutex start').stack + let waitOver = false + taskTimeout = setTimeout(() => { + logger.warn({ stack, waitOver }, 'possible mutex deadlock') + }, MUTEX_TIMEOUT_MS) // wait for the previous task to complete // if there is an error, we swallow so as to not block the queue try {