feat: handle "peer_msg" receipts

This commit is contained in:
Adhiraj Singh
2022-04-02 15:45:09 +05:30
parent 8663598292
commit 334f85f8c5
2 changed files with 82 additions and 87 deletions

View File

@@ -30,7 +30,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
sendReceipt,
resyncMainAppState,
emitEventsFromMap,
uploadPreKeys
uploadPreKeys,
} = sock
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
@@ -142,31 +142,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const processMessageLocal = async(message: proto.IWebMessageInfo) => {
const meId = authState.creds.me!.id
// send ack for history message
const isAnyHistoryMsg = !!message.message && !!normalizeMessageContent(message.message)?.protocolMessage?.historySyncNotification
if(isAnyHistoryMsg) {
await sendNode({
tag: 'receipt',
attrs: {
id: message.key.id,
type: 'hist_sync',
to: jidNormalizedUser(meId)
}
})
}
// process message and emit events
const newEvents = await processMessage(
message,
{ historyCache, meId, keyStore: authState.keys, logger, treatCiphertextMessagesAsReal }
)
if(isAnyHistoryMsg) {
logger.debug('restarting app sync timeout')
// restart the app state sync timeout
appStateSyncTimeout.start()
}
return newEvents
}
@@ -280,72 +260,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}
// recv a message
ws.on('CB:message', (stanza: BinaryNode) => {
const { fullMessage: msg, decryptionTask } = decodeMessageStanza(stanza, authState)
processingMutex.mutex(
msg.key.remoteJid!,
async() => {
await decryptionTask
// message failed to decrypt
if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) {
logger.error(
{ msgId: msg.key.id, params: msg.messageStubParameters },
'failure in decrypting message'
)
retryMutex.mutex(
async() => {
if(ws.readyState === ws.OPEN) {
await sendRetryRequest(stanza)
if(retryRequestDelayMs) {
await delay(retryRequestDelayMs)
}
} else {
logger.debug({ stanza }, 'connection closed, ignoring retry req')
}
}
)
} else {
await sendMessageAck(stanza, { class: 'receipt' })
// no type in the receipt => message delivered
await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined)
logger.debug({ msg: msg.key }, 'sent delivery receipt')
}
msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!)
ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' })
}
)
.catch(
error => onUnexpectedError(error, 'processing message')
)
})
ws.on('CB:ack,class:message', async(node: BinaryNode) => {
sendNode({
tag: 'ack',
attrs: {
class: 'receipt',
id: node.attrs.id,
from: node.attrs.from
}
})
.catch(err => onUnexpectedError(err, 'ack message receipt'))
logger.debug({ attrs: node.attrs }, 'sending receipt for ack')
})
ws.on('CB:call', async(node: BinaryNode) => {
logger.info({ node }, 'recv call')
const [child] = getAllBinaryNodeChildren(node)
if(!!child?.tag) {
sendMessageAck(node, { class: 'call', type: child.tag })
.catch(
error => onUnexpectedError(error, 'ack call')
)
}
})
const sendMessagesAgain = async(key: proto.IMessageKey, ids: string[]) => {
const msgs = await Promise.all(
ids.map(id => (
@@ -512,6 +426,86 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}
// recv a message
ws.on('CB:message', (stanza: BinaryNode) => {
const { fullMessage: msg, category, decryptionTask } = decodeMessageStanza(stanza, authState)
processingMutex.mutex(
msg.key.remoteJid!,
async() => {
await decryptionTask
// message failed to decrypt
if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) {
logger.error(
{ msgId: msg.key.id, params: msg.messageStubParameters },
'failure in decrypting message'
)
retryMutex.mutex(
async() => {
if(ws.readyState === ws.OPEN) {
await sendRetryRequest(stanza)
if(retryRequestDelayMs) {
await delay(retryRequestDelayMs)
}
} else {
logger.debug({ stanza }, 'connection closed, ignoring retry req')
}
}
)
} else {
await sendMessageAck(stanza, { class: 'receipt' })
// no type in the receipt => message delivered
await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined)
// send ack for history message
const normalizedContent = !!msg.message ? normalizeMessageContent(msg.message) : undefined
const isAnyHistoryMsg = !!normalizedContent?.protocolMessage?.historySyncNotification
if(isAnyHistoryMsg) {
await sendReceipt(msg.key.remoteJid!, undefined, [msg.key.id], 'hist_sync')
// we only want to sync app state once we've all the history
// restart the app state sync timeout
logger.debug('restarting app sync timeout')
appStateSyncTimeout.start()
}
if(category === 'peer') {
await sendReceipt(msg.key.remoteJid!, undefined, [msg.key.id], 'peer_msg')
}
}
msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!)
ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' })
}
)
.catch(
error => onUnexpectedError(error, 'processing message')
)
})
ws.on('CB:ack,class:message', async(node: BinaryNode) => {
sendNode({
tag: 'ack',
attrs: {
class: 'receipt',
id: node.attrs.id,
from: node.attrs.from
}
})
.catch(err => onUnexpectedError(err, 'ack message receipt'))
logger.debug({ attrs: node.attrs }, 'sending receipt for ack')
})
ws.on('CB:call', async(node: BinaryNode) => {
logger.info({ node }, 'recv call')
const [child] = getAllBinaryNodeChildren(node)
if(!!child?.tag) {
sendMessageAck(node, { class: 'call', type: child.tag })
.catch(
error => onUnexpectedError(error, 'ack call')
)
}
})
ws.on('CB:receipt', node => {
handleReceipt(node)
.catch(