messaging: fix the hangs caused by the 66e error

This commit is contained in:
Rajeh Taher
2025-01-30 23:31:25 +02:00
parent 40ebf669dd
commit 3fca643d92
3 changed files with 121 additions and 15 deletions

View File

@@ -20,6 +20,8 @@ import {
getHistoryMsg, getHistoryMsg,
getNextPreKeys, getNextPreKeys,
getStatusFromReceiptType, hkdf, getStatusFromReceiptType, hkdf,
MISSING_KEYS_ERROR_TEXT,
NACK_REASONS,
NO_MESSAGE_FOUND_ERROR_TEXT, NO_MESSAGE_FOUND_ERROR_TEXT,
unixTimestampSeconds, unixTimestampSeconds,
xmppPreKey, xmppPreKey,
@@ -89,16 +91,20 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
let sendActiveReceipts = false let sendActiveReceipts = false
const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => { const sendMessageAck = async({ tag, attrs, content }: BinaryNode, errorCode?: number) => {
const stanza: BinaryNode = { const stanza: BinaryNode = {
tag: 'ack', tag: 'ack',
attrs: { attrs: {
id: attrs.id, id: attrs.id,
to: attrs.from, to: attrs.from,
class: tag, class: tag
} }
} }
if(!!errorCode) {
stanza.attrs.error = errorCode.toString()
}
if(!!attrs.participant) { if(!!attrs.participant) {
stanza.attrs.participant = attrs.participant stanza.attrs.participant = attrs.participant
} }
@@ -107,7 +113,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
stanza.attrs.recipient = attrs.recipient stanza.attrs.recipient = attrs.recipient
} }
if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) { if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable') || errorCode !== 0)) {
stanza.attrs.type = attrs.type stanza.attrs.type = attrs.type
} }
@@ -595,7 +601,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
} }
} }
const handleReceipt = async(node: BinaryNode) => { const handleReceipt = async(node: BinaryNode, offline: boolean) => {
const { attrs, content } = node const { attrs, content } = node
const isLid = attrs.from.includes('lid') const isLid = attrs.from.includes('lid')
const isNodeFromMe = areJidsSameUser(attrs.participant || attrs.from, isLid ? authState.creds.me?.lid : authState.creds.me?.id) const isNodeFromMe = areJidsSameUser(attrs.participant || attrs.from, isLid ? authState.creds.me?.lid : authState.creds.me?.id)
@@ -687,7 +693,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
} }
} }
const handleNotification = async(node: BinaryNode) => { const handleNotification = async(node: BinaryNode, offline: boolean) => {
const remoteJid = node.attrs.from const remoteJid = node.attrs.from
if(shouldIgnoreJid(remoteJid) && remoteJid !== '@s.whatsapp.net') { if(shouldIgnoreJid(remoteJid) && remoteJid !== '@s.whatsapp.net') {
logger.debug({ remoteJid, id: node.attrs.id }, 'ignored notification') logger.debug({ remoteJid, id: node.attrs.id }, 'ignored notification')
@@ -723,7 +729,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
} }
} }
const handleMessage = async(node: BinaryNode) => { const handleMessage = async(node: BinaryNode, offline: boolean) => {
if(offline) {
await sendMessageAck(node)
return
}
if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') { if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
logger.debug({ key: node.attrs.key }, 'ignored message') logger.debug({ key: node.attrs.key }, 'ignored message')
await sendMessageAck(node) await sendMessageAck(node)
@@ -771,6 +782,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
await decrypt() await decrypt()
// message failed to decrypt // message failed to decrypt
if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) { if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) {
if(msg?.messageStubParameters?.[0] === MISSING_KEYS_ERROR_TEXT) {
return sendMessageAck(node, NACK_REASONS.ParsingError)
}
retryMutex.mutex( retryMutex.mutex(
async() => { async() => {
if(ws.isOpen) { if(ws.isOpen) {
@@ -816,12 +831,14 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
cleanMessage(msg, authState.creds.me!.id) cleanMessage(msg, authState.creds.me!.id)
await sendMessageAck(node)
await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify')
} }
) )
]) ])
} finally { } catch(error) {
await sendMessageAck(node) logger.error({ error, node }, 'error in handling message')
} }
} }
@@ -965,35 +982,98 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const processNodeWithBuffer = async<T>( const processNodeWithBuffer = async<T>(
node: BinaryNode, node: BinaryNode,
identifier: string, identifier: string,
exec: (node: BinaryNode) => Promise<T> exec: (node: BinaryNode, offline: boolean) => Promise<T>
) => { ) => {
ev.buffer() ev.buffer()
await execTask() await execTask()
ev.flush() ev.flush()
function execTask() { function execTask() {
return exec(node) return exec(node, false)
.catch(err => onUnexpectedError(err, identifier)) .catch(err => onUnexpectedError(err, identifier))
} }
} }
type MessageType = 'message' | 'call' | 'receipt' | 'notification'
type OfflineNode = {
type: MessageType
node: BinaryNode
}
const makeOfflineNodeProcessor = () => {
const nodeProcessorMap: Map<MessageType, (node: BinaryNode, offline: boolean) => Promise<void>> = new Map([
['message', handleMessage],
['call', handleCall],
['receipt', handleReceipt],
['notification', handleNotification]
])
const nodes: OfflineNode[] = []
let isProcessing = false
const enqueue = (type: MessageType, node: BinaryNode) => {
nodes.push({ type, node })
if(isProcessing) {
return
}
isProcessing = true
const promise = async() => {
while(nodes.length && ws.isOpen) {
const { type, node } = nodes.shift()!
const nodeProcessor = nodeProcessorMap.get(type)
if(!nodeProcessor) {
onUnexpectedError(
new Error(`unknown offline node type: ${type}`),
'processing offline node'
)
continue
}
await nodeProcessor(node, true)
}
isProcessing = false
}
promise().catch(error => onUnexpectedError(error, 'processing offline nodes'))
}
return { enqueue }
}
const offlineNodeProcessor = makeOfflineNodeProcessor()
const processNode = (type: MessageType, node: BinaryNode, identifier: string, exec: (node: BinaryNode, offline: boolean) => Promise<void>) => {
const isOffline = !!node.attrs.offline
if(isOffline) {
offlineNodeProcessor.enqueue(type, node)
} else {
processNodeWithBuffer(node, identifier, exec)
}
}
// recv a message // recv a message
ws.on('CB:message', (node: BinaryNode) => { ws.on('CB:message', (node: BinaryNode) => {
processNodeWithBuffer(node, 'processing message', handleMessage) processNode('message', node, 'processing message', handleMessage)
}) })
ws.on('CB:call', async(node: BinaryNode) => { ws.on('CB:call', async(node: BinaryNode) => {
processNodeWithBuffer(node, 'handling call', handleCall) processNode('call', node, 'handling call', handleCall)
}) })
ws.on('CB:receipt', node => { ws.on('CB:receipt', node => {
processNodeWithBuffer(node, 'handling receipt', handleReceipt) processNode('receipt', node, 'handling receipt', handleReceipt)
}) })
ws.on('CB:notification', async(node: BinaryNode) => { ws.on('CB:notification', async(node: BinaryNode) => {
processNodeWithBuffer(node, 'handling notification', handleNotification) processNode('notification', node, 'handling notification', handleNotification)
}) })
ws.on('CB:ack,class:message', (node: BinaryNode) => { ws.on('CB:ack,class:message', (node: BinaryNode) => {
handleBadAck(node) handleBadAck(node)
.catch(error => onUnexpectedError(error, 'handling bad ack')) .catch(error => onUnexpectedError(error, 'handling bad ack'))

View File

@@ -670,6 +670,15 @@ export const makeSocket = (config: SocketConfig) => {
end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch })) end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch }))
}) })
ws.on('CB:ib,,offline_preview', (node: BinaryNode) => {
logger.info('offline preview received', node)
sendNode({
tag: 'ib',
attrs: {},
content: [{ tag: 'offline_batch', attrs: { count: '100' } }]
})
})
ws.on('CB:ib,,edge_routing', (node: BinaryNode) => { ws.on('CB:ib,,edge_routing', (node: BinaryNode) => {
const edgeRoutingNode = getBinaryNodeChild(node, 'edge_routing') const edgeRoutingNode = getBinaryNodeChild(node, 'edge_routing')
const routingInfo = getBinaryNodeChild(edgeRoutingNode, 'routing_info') const routingInfo = getBinaryNodeChild(edgeRoutingNode, 'routing_info')

View File

@@ -6,6 +6,23 @@ import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidNewslette
import { unpadRandomMax16 } from './generics' import { unpadRandomMax16 } from './generics'
export const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node' export const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node'
export const MISSING_KEYS_ERROR_TEXT = 'Key used already or never filled'
export const NACK_REASONS = {
ParsingError: 487,
UnrecognizedStanza: 488,
UnrecognizedStanzaClass: 489,
UnrecognizedStanzaType: 490,
InvalidProtobuf: 491,
InvalidHostedCompanionStanza: 493,
MissingMessageSecret: 495,
SignalErrorOldCounter: 496,
MessageDeletedOnPeer: 499,
UnhandledError: 500,
UnsupportedAdminRevoke: 550,
UnsupportedLIDGroup: 551,
DBOperationFailed: 552
}
type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' | 'newsletter' type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' | 'newsletter'