From 3fca643d92ef6fd4648059be3e0f4c13fa50e9d9 Mon Sep 17 00:00:00 2001 From: Rajeh Taher Date: Thu, 30 Jan 2025 23:31:25 +0200 Subject: [PATCH] messaging: fix the hangs caused by the 66e error --- src/Socket/messages-recv.ts | 110 ++++++++++++++++++++++++++++----- src/Socket/socket.ts | 9 +++ src/Utils/decode-wa-message.ts | 17 +++++ 3 files changed, 121 insertions(+), 15 deletions(-) diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 0b9dd55..33444e6 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -20,6 +20,8 @@ import { getHistoryMsg, getNextPreKeys, getStatusFromReceiptType, hkdf, + MISSING_KEYS_ERROR_TEXT, + NACK_REASONS, NO_MESSAGE_FOUND_ERROR_TEXT, unixTimestampSeconds, xmppPreKey, @@ -89,16 +91,20 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { let sendActiveReceipts = false - const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => { + const sendMessageAck = async({ tag, attrs, content }: BinaryNode, errorCode?: number) => { const stanza: BinaryNode = { tag: 'ack', attrs: { id: attrs.id, to: attrs.from, - class: tag, + class: tag } } + if(!!errorCode) { + stanza.attrs.error = errorCode.toString() + } + if(!!attrs.participant) { stanza.attrs.participant = attrs.participant } @@ -107,7 +113,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { stanza.attrs.recipient = attrs.recipient } - if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) { + if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable') || errorCode !== 0)) { stanza.attrs.type = attrs.type } @@ -595,7 +601,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - const handleReceipt = async(node: BinaryNode) => { + const handleReceipt = async(node: BinaryNode, offline: boolean) => { const { attrs, content } = node const isLid = attrs.from.includes('lid') const isNodeFromMe = areJidsSameUser(attrs.participant || attrs.from, isLid ? authState.creds.me?.lid : authState.creds.me?.id) @@ -687,7 +693,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - const handleNotification = async(node: BinaryNode) => { + const handleNotification = async(node: BinaryNode, offline: boolean) => { const remoteJid = node.attrs.from if(shouldIgnoreJid(remoteJid) && remoteJid !== '@s.whatsapp.net') { logger.debug({ remoteJid, id: node.attrs.id }, 'ignored notification') @@ -723,7 +729,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - const handleMessage = async(node: BinaryNode) => { + const handleMessage = async(node: BinaryNode, offline: boolean) => { + if(offline) { + await sendMessageAck(node) + return + } + if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') { logger.debug({ key: node.attrs.key }, 'ignored message') await sendMessageAck(node) @@ -771,6 +782,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { await decrypt() // message failed to decrypt if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) { + if(msg?.messageStubParameters?.[0] === MISSING_KEYS_ERROR_TEXT) { + return sendMessageAck(node, NACK_REASONS.ParsingError) + } + retryMutex.mutex( async() => { if(ws.isOpen) { @@ -816,12 +831,14 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { cleanMessage(msg, authState.creds.me!.id) + await sendMessageAck(node) + await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') } ) ]) - } finally { - await sendMessageAck(node) + } catch(error) { + logger.error({ error, node }, 'error in handling message') } } @@ -965,35 +982,98 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const processNodeWithBuffer = async( node: BinaryNode, identifier: string, - exec: (node: BinaryNode) => Promise + exec: (node: BinaryNode, offline: boolean) => Promise ) => { ev.buffer() await execTask() ev.flush() function execTask() { - return exec(node) + return exec(node, false) .catch(err => onUnexpectedError(err, identifier)) } } + type MessageType = 'message' | 'call' | 'receipt' | 'notification' + + type OfflineNode = { + type: MessageType + node: BinaryNode + } + + const makeOfflineNodeProcessor = () => { + const nodeProcessorMap: Map Promise> = new Map([ + ['message', handleMessage], + ['call', handleCall], + ['receipt', handleReceipt], + ['notification', handleNotification] + ]) + const nodes: OfflineNode[] = [] + let isProcessing = false + + const enqueue = (type: MessageType, node: BinaryNode) => { + nodes.push({ type, node }) + + if(isProcessing) { + return + } + + isProcessing = true + + const promise = async() => { + while(nodes.length && ws.isOpen) { + const { type, node } = nodes.shift()! + + const nodeProcessor = nodeProcessorMap.get(type) + + if(!nodeProcessor) { + onUnexpectedError( + new Error(`unknown offline node type: ${type}`), + 'processing offline node' + ) + continue + } + + await nodeProcessor(node, true) + } + + isProcessing = false + } + + promise().catch(error => onUnexpectedError(error, 'processing offline nodes')) + } + + return { enqueue } + } + + const offlineNodeProcessor = makeOfflineNodeProcessor() + + const processNode = (type: MessageType, node: BinaryNode, identifier: string, exec: (node: BinaryNode, offline: boolean) => Promise) => { + const isOffline = !!node.attrs.offline + + if(isOffline) { + offlineNodeProcessor.enqueue(type, node) + } else { + processNodeWithBuffer(node, identifier, exec) + } + } + // recv a message ws.on('CB:message', (node: BinaryNode) => { - processNodeWithBuffer(node, 'processing message', handleMessage) + processNode('message', node, 'processing message', handleMessage) }) ws.on('CB:call', async(node: BinaryNode) => { - processNodeWithBuffer(node, 'handling call', handleCall) + processNode('call', node, 'handling call', handleCall) }) ws.on('CB:receipt', node => { - processNodeWithBuffer(node, 'handling receipt', handleReceipt) + processNode('receipt', node, 'handling receipt', handleReceipt) }) ws.on('CB:notification', async(node: BinaryNode) => { - processNodeWithBuffer(node, 'handling notification', handleNotification) + processNode('notification', node, 'handling notification', handleNotification) }) - ws.on('CB:ack,class:message', (node: BinaryNode) => { handleBadAck(node) .catch(error => onUnexpectedError(error, 'handling bad ack')) diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 3fb2d88..64a4486 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -670,6 +670,15 @@ export const makeSocket = (config: SocketConfig) => { end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) }) + ws.on('CB:ib,,offline_preview', (node: BinaryNode) => { + logger.info('offline preview received', node) + sendNode({ + tag: 'ib', + attrs: {}, + content: [{ tag: 'offline_batch', attrs: { count: '100' } }] + }) + }) + ws.on('CB:ib,,edge_routing', (node: BinaryNode) => { const edgeRoutingNode = getBinaryNodeChild(node, 'edge_routing') const routingInfo = getBinaryNodeChild(edgeRoutingNode, 'routing_info') diff --git a/src/Utils/decode-wa-message.ts b/src/Utils/decode-wa-message.ts index d3f5082..d8c0d86 100644 --- a/src/Utils/decode-wa-message.ts +++ b/src/Utils/decode-wa-message.ts @@ -6,6 +6,23 @@ import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidNewslette import { unpadRandomMax16 } from './generics' export const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node' +export const MISSING_KEYS_ERROR_TEXT = 'Key used already or never filled' + +export const NACK_REASONS = { + ParsingError: 487, + UnrecognizedStanza: 488, + UnrecognizedStanzaClass: 489, + UnrecognizedStanzaType: 490, + InvalidProtobuf: 491, + InvalidHostedCompanionStanza: 493, + MissingMessageSecret: 495, + SignalErrorOldCounter: 496, + MessageDeletedOnPeer: 499, + UnhandledError: 500, + UnsupportedAdminRevoke: 550, + UnsupportedLIDGroup: 551, + DBOperationFailed: 552 +} type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' | 'newsletter'