diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 385dd18..a268d99 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -2,7 +2,7 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' import { AppStateChunk, Chat, ChatModification, ChatMutation, Contact, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, toNumber } from '../Utils' -import makeMutex from '../Utils/make-mutex' +import { makeMutex } from '../Utils/make-mutex' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { makeMessagesSocket } from './messages-send' diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 4b9073c..554f7a1 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -3,6 +3,7 @@ import { proto } from '../../WAProto' import { KEY_BUNDLE_TYPE } from '../Defaults' import { Chat, GroupMetadata, MessageUserReceipt, ParticipantAction, SocketConfig, WAMessageStubType } from '../Types' import { decodeMessageStanza, downloadAndProcessHistorySyncNotification, encodeBigEndian, generateSignalPubKey, toNumber, xmppPreKey, xmppSignedPreKey } from '../Utils' +import { makeKeyedMutex } from '../Utils/make-mutex' import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getAllBinaryNodeChildren, getBinaryNodeChildren, isJidGroup, jidDecode, jidEncode, jidNormalizedUser } from '../WABinary' import { makeChatsSocket } from './chats' import { extractGroupMetadata } from './groups' @@ -37,6 +38,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { resyncMainAppState, } = sock + /** the mutex ensures that the notifications (receipts, messages etc.) are processed in order */ + const processingMutex = makeKeyedMutex() + const msgRetryMap = config.msgRetryCounterMap || { } const historyCache = new Set() @@ -338,24 +342,30 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } // recv a message - ws.on('CB:message', async(stanza: BinaryNode) => { - const msg = await decodeMessageStanza(stanza, authState) - // message failed to decrypt - if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { - logger.error( - { msgId: msg.key.id, params: msg.messageStubParameters }, - 'failure in decrypting message' - ) - await sendRetryRequest(stanza) - } else { - await sendMessageAck(stanza, { class: 'receipt' }) - // no type in the receipt => message delivered - await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined) - logger.debug({ msg: msg.key }, 'sent delivery receipt') - } - - msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!) - ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' }) + ws.on('CB:message', (stanza: BinaryNode) => { + const { fullMessage: msg, decryptionTask } = decodeMessageStanza(stanza, authState) + processingMutex.mutex( + msg.key.remoteJid!, + async() => { + await decryptionTask + // message failed to decrypt + if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { + logger.error( + { msgId: msg.key.id, params: msg.messageStubParameters }, + 'failure in decrypting message' + ) + await sendRetryRequest(stanza) + } else { + await sendMessageAck(stanza, { class: 'receipt' }) + // no type in the receipt => message delivered + await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined) + logger.debug({ msg: msg.key }, 'sent delivery receipt') + } + + msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!) + ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' }) + } + ) }) ws.on('CB:ack,class:message', async(node: BinaryNode) => { @@ -428,82 +438,92 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { participant: attrs.participant } - const status = getStatusFromReceiptType(attrs.type) - if( - typeof status !== 'undefined' && - ( - // basically, we only want to know when a message from us has been delivered to/read by the other person - // or another device of ours has read some messages - status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK || - !isNodeFromMe - ) - ) { - if(isJidGroup(remoteJid)) { - const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' - ev.emit( - 'message-receipt.update', - ids.map(id => ({ - key: { ...key, id }, - receipt: { - userJid: jidNormalizedUser(attrs.participant), - [updateKey]: +attrs.t - } - })) - ) - } else { - ev.emit( - 'messages.update', - ids.map(id => ({ - key: { ...key, id }, - update: { status } - })) - ) - } + await processingMutex.mutex( + remoteJid, + async() => { + const status = getStatusFromReceiptType(attrs.type) + if( + typeof status !== 'undefined' && + ( + // basically, we only want to know when a message from us has been delivered to/read by the other person + // or another device of ours has read some messages + status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK || + !isNodeFromMe + ) + ) { + if(isJidGroup(remoteJid)) { + const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' + ev.emit( + 'message-receipt.update', + ids.map(id => ({ + key: { ...key, id }, + receipt: { + userJid: jidNormalizedUser(attrs.participant), + [updateKey]: +attrs.t + } + })) + ) + } else { + ev.emit( + 'messages.update', + ids.map(id => ({ + key: { ...key, id }, + update: { status } + })) + ) + } - } - - if(attrs.type === 'retry') { - // correctly set who is asking for the retry - key.participant = key.participant || attrs.from - if(key.fromMe) { - try { - logger.debug({ attrs }, 'recv retry request') - await sendMessagesAgain(key, ids) - } catch(error) { - logger.error({ key, ids, trace: error.stack }, 'error in sending message again') - shouldAck = false } - } else { - logger.info({ attrs, key }, 'recv retry for not fromMe message') - } - } - if(shouldAck) { - await sendMessageAck(node, { class: 'receipt', type: attrs.type }) - } - + if(attrs.type === 'retry') { + // correctly set who is asking for the retry + key.participant = key.participant || attrs.from + if(key.fromMe) { + try { + logger.debug({ attrs }, 'recv retry request') + await sendMessagesAgain(key, ids) + } catch(error) { + logger.error({ key, ids, trace: error.stack }, 'error in sending message again') + shouldAck = false + } + } else { + logger.info({ attrs, key }, 'recv retry for not fromMe message') + } + } + + if(shouldAck) { + await sendMessageAck(node, { class: 'receipt', type: attrs.type }) + } + } + ) } ws.on('CB:receipt', handleReceipt) ws.on('CB:notification', async(node: BinaryNode) => { - await sendMessageAck(node, { class: 'notification', type: node.attrs.type }) - - const msg = processNotification(node) - if(msg) { - const fromMe = areJidsSameUser(node.attrs.participant || node.attrs.from, authState.creds.me!.id) - msg.key = { - remoteJid: node.attrs.from, - fromMe, - participant: node.attrs.participant, - id: node.attrs.id, - ...(msg.key || {}) + const remoteJid = node.attrs.from + processingMutex.mutex( + remoteJid, + () => { + const msg = processNotification(node) + if(msg) { + const fromMe = areJidsSameUser(node.attrs.participant || node.attrs.from, authState.creds.me!.id) + msg.key = { + remoteJid: node.attrs.from, + fromMe, + participant: node.attrs.participant, + id: node.attrs.id, + ...(msg.key || {}) + } + msg.messageTimestamp = +node.attrs.t + + const fullMsg = proto.WebMessageInfo.fromObject(msg) + ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' }) + } } - msg.messageTimestamp = +node.attrs.t - - const fullMsg = proto.WebMessageInfo.fromObject(msg) - ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' }) - } + ) + + await sendMessageAck(node, { class: 'notification', type: node.attrs.type }) }) ev.on('messages.upsert', async({ messages, type }) => { @@ -520,7 +540,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - await processMessage(msg, chat) + await processingMutex.mutex( + 'p-' + chat.id!, + () => processMessage(msg, chat) + ) + if(!!msg.message && !msg.message!.protocolMessage) { chat.conversationTimestamp = toNumber(msg.messageTimestamp) if(!msg.key.fromMe) { diff --git a/src/Utils/decode-wa-message.ts b/src/Utils/decode-wa-message.ts index 0c947a4..320ca5e 100644 --- a/src/Utils/decode-wa-message.ts +++ b/src/Utils/decode-wa-message.ts @@ -7,7 +7,7 @@ import { decryptGroupSignalProto, decryptSignalProto, processSenderKeyMessage } type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' -export const decodeMessageStanza = async(stanza: BinaryNode, auth: AuthenticationState) => { +export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationState) => { //const deviceIdentity = (stanza.content as BinaryNodeM[])?.find(m => m.tag === 'device-identity') //const deviceIdentityBytes = deviceIdentity ? deviceIdentity.content as Buffer : undefined @@ -81,48 +81,51 @@ export const decodeMessageStanza = async(stanza: BinaryNode, auth: Authenticatio fullMessage.status = proto.WebMessageInfo.WebMessageInfoStatus.SERVER_ACK } - if(Array.isArray(stanza.content)) { - for(const { tag, attrs, content } of stanza.content) { - if(tag !== 'enc') { - continue + return { + fullMessage, + decryptionTask: (async() => { + if(Array.isArray(stanza.content)) { + for(const { tag, attrs, content } of stanza.content) { + if(tag !== 'enc') { + continue + } + + if(!(content instanceof Uint8Array)) { + continue + } + + let msgBuffer: Buffer + + try { + const e2eType = attrs.type + switch (e2eType) { + case 'skmsg': + msgBuffer = await decryptGroupSignalProto(sender, author, content, auth) + break + case 'pkmsg': + case 'msg': + const user = isJidUser(sender) ? sender : author + msgBuffer = await decryptSignalProto(user, e2eType, content as Buffer, auth) + break + } + + let msg: proto.IMessage = proto.Message.decode(unpadRandomMax16(msgBuffer)) + msg = msg.deviceSentMessage?.message || msg + if(msg.senderKeyDistributionMessage) { + await processSenderKeyMessage(author, msg.senderKeyDistributionMessage, auth) + } + + if(fullMessage.message) { + Object.assign(fullMessage.message, msg) + } else { + fullMessage.message = msg + } + } catch(error) { + fullMessage.messageStubType = proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT + fullMessage.messageStubParameters = [error.message] + } + } } - - if(!(content instanceof Uint8Array)) { - continue - } - - let msgBuffer: Buffer - - try { - const e2eType = attrs.type - switch (e2eType) { - case 'skmsg': - msgBuffer = await decryptGroupSignalProto(sender, author, content, auth) - break - case 'pkmsg': - case 'msg': - const user = isJidUser(sender) ? sender : author - msgBuffer = await decryptSignalProto(user, e2eType, content as Buffer, auth) - break - } - - let msg: proto.IMessage = proto.Message.decode(unpadRandomMax16(msgBuffer)) - msg = msg.deviceSentMessage?.message || msg - if(msg.senderKeyDistributionMessage) { - await processSenderKeyMessage(author, msg.senderKeyDistributionMessage, auth) - } - - if(fullMessage.message) { - Object.assign(fullMessage.message, msg) - } else { - fullMessage.message = msg - } - } catch(error) { - fullMessage.messageStubType = proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT - fullMessage.messageStubParameters = [error.message] - } - } + })() } - - return fullMessage } \ No newline at end of file diff --git a/src/Utils/make-mutex.ts b/src/Utils/make-mutex.ts index 8903b26..d897903 100644 --- a/src/Utils/make-mutex.ts +++ b/src/Utils/make-mutex.ts @@ -1,22 +1,36 @@ - -export default () => { +export const makeMutex = () => { let task = Promise.resolve() as Promise return { - mutex(code: () => Promise):Promise { + mutex(code: () => Promise | T): Promise { task = (async() => { - // wait for the previous task to complete - // if there is an error, we swallow so as to not block the queue - try { - await task + // wait for the previous task to complete + // if there is an error, we swallow so as to not block the queue + try { + await task } catch{ } - // execute the current task - return code() + // execute the current task + return code() })() // we replace the existing task, appending the new piece of execution to it // so the next task will have to wait for this one to finish return task - }, + }, + } +} + +export type Mutex = ReturnType + +export const makeKeyedMutex = () => { + const map: { [id: string]: Mutex } = {} + + return { + mutex(key: string, task: () => Promise | T): Promise { + if(!map[key]) { + map[key] = makeMutex() + } + + return map[key].mutex(task) + } } } - \ No newline at end of file