feat: handle bad message acks + send retry to all correctly

This commit is contained in:
Adhiraj Singh
2022-06-22 14:50:48 +05:30
parent d61d333c5b
commit 721d0f32d6
3 changed files with 53 additions and 18 deletions

View File

@@ -1,7 +1,7 @@
import { proto } from '../../WAProto'
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types'
import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType } from '../Types'
import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex'
import processMessage, { cleanMessage } from '../Utils/process-message'
@@ -14,7 +14,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
logger,
treatCiphertextMessagesAsReal,
retryRequestDelayMs,
downloadHistory
downloadHistory,
getMessage
} = config
const sock = makeChatsSocket(config)
const {
@@ -316,28 +317,33 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
const sendMessagesAgain = async(key: proto.IMessageKey, ids: string[]) => {
const msgs = await Promise.all(
ids.map(id => (
config.getMessage({ ...key, id })
))
)
const msgs = await Promise.all(ids.map(id => getMessage({ ...key, id })))
const participant = key.participant || key.remoteJid
// if it's the primary jid sending the request
// just re-send the message to everyone
// prevents the first message decryption failure
const sendToAll = !jidDecode(participant).device
await assertSessions([participant], true)
if(isJidGroup(key.remoteJid)) {
await authState.keys.set({ 'sender-key-memory': { [key.remoteJid]: null } })
}
logger.debug({ participant }, 'forced new session for retry recp')
logger.debug({ participant, sendToAll }, 'forced new session for retry recp')
for(let i = 0; i < msgs.length;i++) {
if(msgs[i]) {
updateSendMessageAgainCount(ids[i], participant)
await relayMessage(key.remoteJid, msgs[i], {
messageId: ids[i],
participant
})
const msgRelayOpts: MessageRelayOptions = { messageId: ids[i] }
if(sendToAll) {
msgRelayOpts.useUserDevicesCache = false
} else {
msgRelayOpts.participant = participant
}
await relayMessage(key.remoteJid, msgs[i], msgRelayOpts)
} else {
logger.debug({ jid: key.remoteJid, id: ids[i] }, 'recv retry request, but message not available')
}
@@ -475,6 +481,22 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}
const handleBadAck = async({ attrs }: BinaryNode) => {
// current hypothesis is that if pash is sent in the ack
// it means -- the message hasn't reached all devices yet
// we'll retry sending the message here
if(attrs.phash) {
logger.info({ attrs }, 'received phash in ack, resending message...')
const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id }
const msg = await getMessage(key)
if(msg) {
await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false })
} else {
logger.warn({ attrs }, 'could not send message again, as it was not found')
}
}
}
// recv a message
ws.on('CB:message', (stanza: BinaryNode) => {
const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(stanza, authState)
@@ -585,6 +607,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
)
})
ws.on('CB:ack,class:message', (node: BinaryNode) => {
handleBadAck(node)
.catch(error => onUnexpectedError(error, 'handling bad ack'))
})
ev.on('messages.upsert', data => {
handleUpsertedMessages(data)
.catch(

View File

@@ -141,15 +141,20 @@ export const makeMessagesSocket = (config: SocketConfig) => {
}
}
const getUSyncDevices = async(jids: string[], ignoreZeroDevices: boolean) => {
/** Fetch all the devices we've to send a message to */
const getUSyncDevices = async(jids: string[], useCache: boolean, ignoreZeroDevices: boolean) => {
const deviceResults: JidWithDevice[] = []
if(!useCache) {
logger.debug('not using cache for devices')
}
const users: BinaryNode[] = []
jids = Array.from(new Set(jids))
for(let jid of jids) {
const user = jidDecode(jid).user
jid = jidNormalizedUser(jid)
if(userDevicesCache.has(user)) {
if(userDevicesCache.has(user) && useCache) {
const devices: JidWithDevice[] = userDevicesCache.get(user)
deviceResults.push(...devices)
@@ -285,7 +290,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
const relayMessage = async(
jid: string,
message: proto.IMessage,
{ messageId: msgId, participant, additionalAttributes, cachedGroupMetadata }: MessageRelayOptions
{ messageId: msgId, participant, additionalAttributes, useUserDevicesCache, cachedGroupMetadata }: MessageRelayOptions
) => {
const meId = authState.creds.me!.id
@@ -294,6 +299,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
const { user, server } = jidDecode(jid)
const isGroup = server === 'g.us'
msgId = msgId || generateMessageID()
useUserDevicesCache = useUserDevicesCache !== false
const encodedMsg = encodeWAMessage(message)
const participants: BinaryNode[] = []
@@ -345,7 +351,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
if(!participant) {
const participantsList = groupData.participants.map(p => p.id)
const additionalDevices = await getUSyncDevices(participantsList, false)
const additionalDevices = await getUSyncDevices(participantsList, useUserDevicesCache, false)
devices.push(...additionalDevices)
}
@@ -401,7 +407,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
devices.push({ user })
devices.push({ user: meUser })
const additionalDevices = await getUSyncDevices([ meId, jid ], true)
const additionalDevices = await getUSyncDevices([ meId, jid ], useUserDevicesCache, true)
devices.push(...additionalDevices)
}

View File

@@ -93,7 +93,7 @@ export type AnyMediaMessageContent = (
seconds?: number
} | ({
sticker: WAMediaUpload
isAnimated?: boolean
isAnimated?: boolean
} & WithDimensions) | ({
document: WAMediaUpload
mimetype: string
@@ -153,6 +153,8 @@ export type MessageRelayOptions = MinimalRelayOptions & {
participant?: string
/** additional attributes to add to the WA binary node */
additionalAttributes?: { [_: string]: string }
/** should we use the devices cache, or fetch afresh from the server; default assumed to be "true" */
useUserDevicesCache?: boolean
}
export type MiscMessageGenerationOptions = MinimalRelayOptions & {