diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index a7b142e..6066680 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -622,67 +622,70 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ids.push(...items.map(i => i.attrs.id)) } - await Promise.all([ - processingMutex.mutex( - async() => { - const status = getStatusFromReceiptType(attrs.type) - if( - typeof status !== 'undefined' && - ( - // basically, we only want to know when a message from us has been delivered to/read by the other person - // or another device of ours has read some messages - status > proto.WebMessageInfo.Status.DELIVERY_ACK || - !isNodeFromMe - ) - ) { - if(isJidGroup(remoteJid) || isJidStatusBroadcast(remoteJid)) { - if(attrs.participant) { - const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.Status.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' + try { + await Promise.all([ + processingMutex.mutex( + async() => { + const status = getStatusFromReceiptType(attrs.type) + if( + typeof status !== 'undefined' && + ( + // basically, we only want to know when a message from us has been delivered to/read by the other person + // or another device of ours has read some messages + status > proto.WebMessageInfo.Status.DELIVERY_ACK || + !isNodeFromMe + ) + ) { + if(isJidGroup(remoteJid) || isJidStatusBroadcast(remoteJid)) { + if(attrs.participant) { + const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.Status.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' + ev.emit( + 'message-receipt.update', + ids.map(id => ({ + key: { ...key, id }, + receipt: { + userJid: jidNormalizedUser(attrs.participant), + [updateKey]: +attrs.t + } + })) + ) + } + } else { ev.emit( - 'message-receipt.update', + 'messages.update', ids.map(id => ({ key: { ...key, id }, - receipt: { - userJid: jidNormalizedUser(attrs.participant), - [updateKey]: +attrs.t - } + update: { status } })) ) } - } else { - ev.emit( - 'messages.update', - ids.map(id => ({ - key: { ...key, id }, - update: { status } - })) - ) } - } - if(attrs.type === 'retry') { - // correctly set who is asking for the retry - key.participant = key.participant || attrs.from - const retryNode = getBinaryNodeChild(node, 'retry') - if(willSendMessageAgain(ids[0], key.participant)) { - if(key.fromMe) { - try { - logger.debug({ attrs, key }, 'recv retry request') - await sendMessagesAgain(key, ids, retryNode!) - } catch(error) { - logger.error({ key, ids, trace: error.stack }, 'error in sending message again') + if(attrs.type === 'retry') { + // correctly set who is asking for the retry + key.participant = key.participant || attrs.from + const retryNode = getBinaryNodeChild(node, 'retry') + if(willSendMessageAgain(ids[0], key.participant)) { + if(key.fromMe) { + try { + logger.debug({ attrs, key }, 'recv retry request') + await sendMessagesAgain(key, ids, retryNode!) + } catch(error) { + logger.error({ key, ids, trace: error.stack }, 'error in sending message again') + } + } else { + logger.info({ attrs, key }, 'recv retry for not fromMe message') } } else { - logger.info({ attrs, key }, 'recv retry for not fromMe message') + logger.info({ attrs, key }, 'will not send message again, as sent too many times') } - } else { - logger.info({ attrs, key }, 'will not send message again, as sent too many times') } } - } - ), - sendMessageAck(node) - ]) + ) + ]) + } finally { + await sendMessageAck(node) + } } const handleNotification = async(node: BinaryNode) => { @@ -693,29 +696,32 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { return } - await Promise.all([ - processingMutex.mutex( - async() => { - const msg = await processNotification(node) - if(msg) { - const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) - msg.key = { - remoteJid, - fromMe, - participant: node.attrs.participant, - id: node.attrs.id, - ...(msg.key || {}) - } - msg.participant ??= node.attrs.participant - msg.messageTimestamp = +node.attrs.t + try { + await Promise.all([ + processingMutex.mutex( + async() => { + const msg = await processNotification(node) + if(msg) { + const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) + msg.key = { + remoteJid, + fromMe, + participant: node.attrs.participant, + id: node.attrs.id, + ...(msg.key || {}) + } + msg.participant ??= node.attrs.participant + msg.messageTimestamp = +node.attrs.t - const fullMsg = proto.WebMessageInfo.fromObject(msg) - await upsertMessage(fullMsg, 'append') + const fullMsg = proto.WebMessageInfo.fromObject(msg) + await upsertMessage(fullMsg, 'append') + } } - } - ), - sendMessageAck(node) - ]) + ) + ]) + } finally { + await sendMessageAck(node) + } } const handleMessage = async(node: BinaryNode) => { @@ -761,62 +767,65 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - await Promise.all([ - processingMutex.mutex( - async() => { - await decrypt() - // message failed to decrypt - if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) { - retryMutex.mutex( - async() => { - if(ws.isOpen) { - if(getBinaryNodeChild(node, 'unavailable')) { - return - } + try { + await Promise.all([ + processingMutex.mutex( + async() => { + await decrypt() + // message failed to decrypt + if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) { + retryMutex.mutex( + async() => { + if(ws.isOpen) { + if(getBinaryNodeChild(node, 'unavailable')) { + return + } - const encNode = getBinaryNodeChild(node, 'enc') - await sendRetryRequest(node, !encNode) - if(retryRequestDelayMs) { - await delay(retryRequestDelayMs) + const encNode = getBinaryNodeChild(node, 'enc') + await sendRetryRequest(node, !encNode) + if(retryRequestDelayMs) { + await delay(retryRequestDelayMs) + } + } else { + logger.debug({ node }, 'connection closed, ignoring retry req') } - } else { - logger.debug({ node }, 'connection closed, ignoring retry req') } + ) + } else { + // no type in the receipt => message delivered + let type: MessageReceiptType = undefined + let participant = msg.key.participant + if(category === 'peer') { // special peer message + type = 'peer_msg' + } else if(msg.key.fromMe) { // message was sent by us from a different device + type = 'sender' + // need to specially handle this case + if(isJidUser(msg.key.remoteJid!)) { + participant = author + } + } else if(!sendActiveReceipts) { + type = 'inactive' } - ) - } else { - // no type in the receipt => message delivered - let type: MessageReceiptType = undefined - let participant = msg.key.participant - if(category === 'peer') { // special peer message - type = 'peer_msg' - } else if(msg.key.fromMe) { // message was sent by us from a different device - type = 'sender' - // need to specially handle this case - if(isJidUser(msg.key.remoteJid!)) { - participant = author + + await sendReceipt(msg.key.remoteJid!, participant!, [msg.key.id!], type) + + // send ack for history message + const isAnyHistoryMsg = getHistoryMsg(msg.message!) + if(isAnyHistoryMsg) { + const jid = jidNormalizedUser(msg.key.remoteJid!) + await sendReceipt(jid, undefined, [msg.key.id!], 'hist_sync') } - } else if(!sendActiveReceipts) { - type = 'inactive' } - await sendReceipt(msg.key.remoteJid!, participant!, [msg.key.id!], type) + cleanMessage(msg, authState.creds.me!.id) - // send ack for history message - const isAnyHistoryMsg = getHistoryMsg(msg.message!) - if(isAnyHistoryMsg) { - const jid = jidNormalizedUser(msg.key.remoteJid!) - await sendReceipt(jid, undefined, [msg.key.id!], 'hist_sync') - } + await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') } - - cleanMessage(msg, authState.creds.me!.id) - - await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') - } - ), - sendMessageAck(node) - ]) + ) + ]) + } finally { + await sendMessageAck(node) + } } const fetchMessageHistory = async(