hotfix: always send acks even in case of errors (#1043)

This commit is contained in:
vini
2024-09-22 08:35:50 -03:00
committed by GitHub
parent fda2689169
commit 71082bf1b3

View File

@@ -622,67 +622,70 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
ids.push(...items.map(i => i.attrs.id)) ids.push(...items.map(i => i.attrs.id))
} }
await Promise.all([ try {
processingMutex.mutex( await Promise.all([
async() => { processingMutex.mutex(
const status = getStatusFromReceiptType(attrs.type) async() => {
if( const status = getStatusFromReceiptType(attrs.type)
typeof status !== 'undefined' && if(
( typeof status !== 'undefined' &&
// basically, we only want to know when a message from us has been delivered to/read by the other person (
// or another device of ours has read some messages // basically, we only want to know when a message from us has been delivered to/read by the other person
status > proto.WebMessageInfo.Status.DELIVERY_ACK || // or another device of ours has read some messages
!isNodeFromMe status > proto.WebMessageInfo.Status.DELIVERY_ACK ||
) !isNodeFromMe
) { )
if(isJidGroup(remoteJid) || isJidStatusBroadcast(remoteJid)) { ) {
if(attrs.participant) { if(isJidGroup(remoteJid) || isJidStatusBroadcast(remoteJid)) {
const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.Status.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' if(attrs.participant) {
const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.Status.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp'
ev.emit(
'message-receipt.update',
ids.map(id => ({
key: { ...key, id },
receipt: {
userJid: jidNormalizedUser(attrs.participant),
[updateKey]: +attrs.t
}
}))
)
}
} else {
ev.emit( ev.emit(
'message-receipt.update', 'messages.update',
ids.map(id => ({ ids.map(id => ({
key: { ...key, id }, key: { ...key, id },
receipt: { update: { status }
userJid: jidNormalizedUser(attrs.participant),
[updateKey]: +attrs.t
}
})) }))
) )
} }
} else {
ev.emit(
'messages.update',
ids.map(id => ({
key: { ...key, id },
update: { status }
}))
)
} }
}
if(attrs.type === 'retry') { if(attrs.type === 'retry') {
// correctly set who is asking for the retry // correctly set who is asking for the retry
key.participant = key.participant || attrs.from key.participant = key.participant || attrs.from
const retryNode = getBinaryNodeChild(node, 'retry') const retryNode = getBinaryNodeChild(node, 'retry')
if(willSendMessageAgain(ids[0], key.participant)) { if(willSendMessageAgain(ids[0], key.participant)) {
if(key.fromMe) { if(key.fromMe) {
try { try {
logger.debug({ attrs, key }, 'recv retry request') logger.debug({ attrs, key }, 'recv retry request')
await sendMessagesAgain(key, ids, retryNode!) await sendMessagesAgain(key, ids, retryNode!)
} catch(error) { } catch(error) {
logger.error({ key, ids, trace: error.stack }, 'error in sending message again') logger.error({ key, ids, trace: error.stack }, 'error in sending message again')
}
} else {
logger.info({ attrs, key }, 'recv retry for not fromMe message')
} }
} else { } else {
logger.info({ attrs, key }, 'recv retry for not fromMe message') logger.info({ attrs, key }, 'will not send message again, as sent too many times')
} }
} else {
logger.info({ attrs, key }, 'will not send message again, as sent too many times')
} }
} }
} )
), ])
sendMessageAck(node) } finally {
]) await sendMessageAck(node)
}
} }
const handleNotification = async(node: BinaryNode) => { const handleNotification = async(node: BinaryNode) => {
@@ -693,29 +696,32 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
return return
} }
await Promise.all([ try {
processingMutex.mutex( await Promise.all([
async() => { processingMutex.mutex(
const msg = await processNotification(node) async() => {
if(msg) { const msg = await processNotification(node)
const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) if(msg) {
msg.key = { const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id)
remoteJid, msg.key = {
fromMe, remoteJid,
participant: node.attrs.participant, fromMe,
id: node.attrs.id, participant: node.attrs.participant,
...(msg.key || {}) id: node.attrs.id,
} ...(msg.key || {})
msg.participant ??= node.attrs.participant }
msg.messageTimestamp = +node.attrs.t msg.participant ??= node.attrs.participant
msg.messageTimestamp = +node.attrs.t
const fullMsg = proto.WebMessageInfo.fromObject(msg) const fullMsg = proto.WebMessageInfo.fromObject(msg)
await upsertMessage(fullMsg, 'append') await upsertMessage(fullMsg, 'append')
}
} }
} )
), ])
sendMessageAck(node) } finally {
]) await sendMessageAck(node)
}
} }
const handleMessage = async(node: BinaryNode) => { const handleMessage = async(node: BinaryNode) => {
@@ -761,62 +767,65 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
} }
} }
await Promise.all([ try {
processingMutex.mutex( await Promise.all([
async() => { processingMutex.mutex(
await decrypt() async() => {
// message failed to decrypt await decrypt()
if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) { // message failed to decrypt
retryMutex.mutex( if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) {
async() => { retryMutex.mutex(
if(ws.isOpen) { async() => {
if(getBinaryNodeChild(node, 'unavailable')) { if(ws.isOpen) {
return if(getBinaryNodeChild(node, 'unavailable')) {
} return
}
const encNode = getBinaryNodeChild(node, 'enc') const encNode = getBinaryNodeChild(node, 'enc')
await sendRetryRequest(node, !encNode) await sendRetryRequest(node, !encNode)
if(retryRequestDelayMs) { if(retryRequestDelayMs) {
await delay(retryRequestDelayMs) await delay(retryRequestDelayMs)
}
} else {
logger.debug({ node }, 'connection closed, ignoring retry req')
} }
} else {
logger.debug({ node }, 'connection closed, ignoring retry req')
} }
)
} else {
// no type in the receipt => message delivered
let type: MessageReceiptType = undefined
let participant = msg.key.participant
if(category === 'peer') { // special peer message
type = 'peer_msg'
} else if(msg.key.fromMe) { // message was sent by us from a different device
type = 'sender'
// need to specially handle this case
if(isJidUser(msg.key.remoteJid!)) {
participant = author
}
} else if(!sendActiveReceipts) {
type = 'inactive'
} }
)
} else { await sendReceipt(msg.key.remoteJid!, participant!, [msg.key.id!], type)
// no type in the receipt => message delivered
let type: MessageReceiptType = undefined // send ack for history message
let participant = msg.key.participant const isAnyHistoryMsg = getHistoryMsg(msg.message!)
if(category === 'peer') { // special peer message if(isAnyHistoryMsg) {
type = 'peer_msg' const jid = jidNormalizedUser(msg.key.remoteJid!)
} else if(msg.key.fromMe) { // message was sent by us from a different device await sendReceipt(jid, undefined, [msg.key.id!], 'hist_sync')
type = 'sender'
// need to specially handle this case
if(isJidUser(msg.key.remoteJid!)) {
participant = author
} }
} else if(!sendActiveReceipts) {
type = 'inactive'
} }
await sendReceipt(msg.key.remoteJid!, participant!, [msg.key.id!], type) cleanMessage(msg, authState.creds.me!.id)
// send ack for history message await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify')
const isAnyHistoryMsg = getHistoryMsg(msg.message!)
if(isAnyHistoryMsg) {
const jid = jidNormalizedUser(msg.key.remoteJid!)
await sendReceipt(jid, undefined, [msg.key.id!], 'hist_sync')
}
} }
)
cleanMessage(msg, authState.creds.me!.id) ])
} finally {
await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') await sendMessageAck(node)
} }
),
sendMessageAck(node)
])
} }
const fetchMessageHistory = async( const fetchMessageHistory = async(