From 334f85f8c5fc4251ee94201965c7593a48d36990 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Sat, 2 Apr 2022 15:45:09 +0530 Subject: [PATCH] feat: handle "peer_msg" receipts --- src/Socket/messages-recv.ts | 168 ++++++++++++++++----------------- src/Utils/decode-wa-message.ts | 1 + 2 files changed, 82 insertions(+), 87 deletions(-) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 1e627c3..734faa2 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -30,7 +30,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { sendReceipt, resyncMainAppState, emitEventsFromMap, - uploadPreKeys + uploadPreKeys, } = sock /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ @@ -142,31 +142,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const processMessageLocal = async(message: proto.IWebMessageInfo) => { const meId = authState.creds.me!.id - // send ack for history message - const isAnyHistoryMsg = !!message.message && !!normalizeMessageContent(message.message)?.protocolMessage?.historySyncNotification - if(isAnyHistoryMsg) { - await sendNode({ - tag: 'receipt', - attrs: { - id: message.key.id, - type: 'hist_sync', - to: jidNormalizedUser(meId) - } - }) - } - // process message and emit events const newEvents = await processMessage( message, { historyCache, meId, keyStore: authState.keys, logger, treatCiphertextMessagesAsReal } ) - - if(isAnyHistoryMsg) { - logger.debug('restarting app sync timeout') - // restart the app state sync timeout - appStateSyncTimeout.start() - } - return newEvents } @@ -280,72 +260,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - // recv a message - ws.on('CB:message', (stanza: BinaryNode) => { - const { fullMessage: msg, decryptionTask } = decodeMessageStanza(stanza, authState) - processingMutex.mutex( - msg.key.remoteJid!, - async() => { - await decryptionTask - // message failed to decrypt - if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { - logger.error( - { msgId: msg.key.id, params: msg.messageStubParameters }, - 'failure in decrypting message' - ) - retryMutex.mutex( - async() => { - if(ws.readyState === ws.OPEN) { - await sendRetryRequest(stanza) - if(retryRequestDelayMs) { - await delay(retryRequestDelayMs) - } - } else { - logger.debug({ stanza }, 'connection closed, ignoring retry req') - } - } - ) - } else { - await sendMessageAck(stanza, { class: 'receipt' }) - // no type in the receipt => message delivered - await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined) - logger.debug({ msg: msg.key }, 'sent delivery receipt') - } - - msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!) - ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' }) - } - ) - .catch( - error => onUnexpectedError(error, 'processing message') - ) - }) - - ws.on('CB:ack,class:message', async(node: BinaryNode) => { - sendNode({ - tag: 'ack', - attrs: { - class: 'receipt', - id: node.attrs.id, - from: node.attrs.from - } - }) - .catch(err => onUnexpectedError(err, 'ack message receipt')) - logger.debug({ attrs: node.attrs }, 'sending receipt for ack') - }) - - ws.on('CB:call', async(node: BinaryNode) => { - logger.info({ node }, 'recv call') - - const [child] = getAllBinaryNodeChildren(node) - if(!!child?.tag) { - sendMessageAck(node, { class: 'call', type: child.tag }) - .catch( - error => onUnexpectedError(error, 'ack call') - ) - } - }) - const sendMessagesAgain = async(key: proto.IMessageKey, ids: string[]) => { const msgs = await Promise.all( ids.map(id => ( @@ -512,6 +426,86 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } + // recv a message + ws.on('CB:message', (stanza: BinaryNode) => { + const { fullMessage: msg, category, decryptionTask } = decodeMessageStanza(stanza, authState) + processingMutex.mutex( + msg.key.remoteJid!, + async() => { + await decryptionTask + // message failed to decrypt + if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { + logger.error( + { msgId: msg.key.id, params: msg.messageStubParameters }, + 'failure in decrypting message' + ) + retryMutex.mutex( + async() => { + if(ws.readyState === ws.OPEN) { + await sendRetryRequest(stanza) + if(retryRequestDelayMs) { + await delay(retryRequestDelayMs) + } + } else { + logger.debug({ stanza }, 'connection closed, ignoring retry req') + } + } + ) + } else { + await sendMessageAck(stanza, { class: 'receipt' }) + // no type in the receipt => message delivered + await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined) + + // send ack for history message + const normalizedContent = !!msg.message ? normalizeMessageContent(msg.message) : undefined + const isAnyHistoryMsg = !!normalizedContent?.protocolMessage?.historySyncNotification + if(isAnyHistoryMsg) { + await sendReceipt(msg.key.remoteJid!, undefined, [msg.key.id], 'hist_sync') + // we only want to sync app state once we've all the history + // restart the app state sync timeout + logger.debug('restarting app sync timeout') + appStateSyncTimeout.start() + } + + if(category === 'peer') { + await sendReceipt(msg.key.remoteJid!, undefined, [msg.key.id], 'peer_msg') + } + } + + msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!) + ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' }) + } + ) + .catch( + error => onUnexpectedError(error, 'processing message') + ) + }) + + ws.on('CB:ack,class:message', async(node: BinaryNode) => { + sendNode({ + tag: 'ack', + attrs: { + class: 'receipt', + id: node.attrs.id, + from: node.attrs.from + } + }) + .catch(err => onUnexpectedError(err, 'ack message receipt')) + logger.debug({ attrs: node.attrs }, 'sending receipt for ack') + }) + + ws.on('CB:call', async(node: BinaryNode) => { + logger.info({ node }, 'recv call') + + const [child] = getAllBinaryNodeChildren(node) + if(!!child?.tag) { + sendMessageAck(node, { class: 'call', type: child.tag }) + .catch( + error => onUnexpectedError(error, 'ack call') + ) + } + }) + ws.on('CB:receipt', node => { handleReceipt(node) .catch( diff --git a/src/Utils/decode-wa-message.ts b/src/Utils/decode-wa-message.ts index f8734c4..6aa7314 100644 --- a/src/Utils/decode-wa-message.ts +++ b/src/Utils/decode-wa-message.ts @@ -82,6 +82,7 @@ export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationStat return { fullMessage, + category: stanza.attrs.category, decryptionTask: (async() => { let decryptables = 0 if(Array.isArray(stanza.content)) {