diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 05ac438..a070547 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -1,7 +1,7 @@ import { proto } from '../../WAProto' import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults' -import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types' +import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType } from '../Types' import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils' import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex' import processMessage, { cleanMessage } from '../Utils/process-message' @@ -14,7 +14,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { logger, treatCiphertextMessagesAsReal, retryRequestDelayMs, - downloadHistory + downloadHistory, + getMessage } = config const sock = makeChatsSocket(config) const { @@ -316,28 +317,33 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } const sendMessagesAgain = async(key: proto.IMessageKey, ids: string[]) => { - const msgs = await Promise.all( - ids.map(id => ( - config.getMessage({ ...key, id }) - )) - ) + const msgs = await Promise.all(ids.map(id => getMessage({ ...key, id }))) const participant = key.participant || key.remoteJid + // if it's the primary jid sending the request + // just re-send the message to everyone + // prevents the first message decryption failure + const sendToAll = !jidDecode(participant).device await assertSessions([participant], true) if(isJidGroup(key.remoteJid)) { await authState.keys.set({ 'sender-key-memory': { [key.remoteJid]: null } }) } - logger.debug({ participant }, 'forced new session for retry recp') + logger.debug({ participant, sendToAll }, 'forced new session for retry recp') for(let i = 0; i < msgs.length;i++) { if(msgs[i]) { updateSendMessageAgainCount(ids[i], participant) - await relayMessage(key.remoteJid, msgs[i], { - messageId: ids[i], - participant - }) + const msgRelayOpts: MessageRelayOptions = { messageId: ids[i] } + + if(sendToAll) { + msgRelayOpts.useUserDevicesCache = false + } else { + msgRelayOpts.participant = participant + } + + await relayMessage(key.remoteJid, msgs[i], msgRelayOpts) } else { logger.debug({ jid: key.remoteJid, id: ids[i] }, 'recv retry request, but message not available') } @@ -475,6 +481,22 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } + const handleBadAck = async({ attrs }: BinaryNode) => { + // current hypothesis is that if pash is sent in the ack + // it means -- the message hasn't reached all devices yet + // we'll retry sending the message here + if(attrs.phash) { + logger.info({ attrs }, 'received phash in ack, resending message...') + const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id } + const msg = await getMessage(key) + if(msg) { + await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false }) + } else { + logger.warn({ attrs }, 'could not send message again, as it was not found') + } + } + } + // recv a message ws.on('CB:message', (stanza: BinaryNode) => { const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(stanza, authState) @@ -585,6 +607,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ) }) + ws.on('CB:ack,class:message', (node: BinaryNode) => { + handleBadAck(node) + .catch(error => onUnexpectedError(error, 'handling bad ack')) + }) + ev.on('messages.upsert', data => { handleUpsertedMessages(data) .catch( diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index b7b847a..23895e9 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -141,15 +141,20 @@ export const makeMessagesSocket = (config: SocketConfig) => { } } - const getUSyncDevices = async(jids: string[], ignoreZeroDevices: boolean) => { + /** Fetch all the devices we've to send a message to */ + const getUSyncDevices = async(jids: string[], useCache: boolean, ignoreZeroDevices: boolean) => { const deviceResults: JidWithDevice[] = [] + if(!useCache) { + logger.debug('not using cache for devices') + } + const users: BinaryNode[] = [] jids = Array.from(new Set(jids)) for(let jid of jids) { const user = jidDecode(jid).user jid = jidNormalizedUser(jid) - if(userDevicesCache.has(user)) { + if(userDevicesCache.has(user) && useCache) { const devices: JidWithDevice[] = userDevicesCache.get(user) deviceResults.push(...devices) @@ -285,7 +290,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { const relayMessage = async( jid: string, message: proto.IMessage, - { messageId: msgId, participant, additionalAttributes, cachedGroupMetadata }: MessageRelayOptions + { messageId: msgId, participant, additionalAttributes, useUserDevicesCache, cachedGroupMetadata }: MessageRelayOptions ) => { const meId = authState.creds.me!.id @@ -294,6 +299,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { const { user, server } = jidDecode(jid) const isGroup = server === 'g.us' msgId = msgId || generateMessageID() + useUserDevicesCache = useUserDevicesCache !== false const encodedMsg = encodeWAMessage(message) const participants: BinaryNode[] = [] @@ -345,7 +351,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { if(!participant) { const participantsList = groupData.participants.map(p => p.id) - const additionalDevices = await getUSyncDevices(participantsList, false) + const additionalDevices = await getUSyncDevices(participantsList, useUserDevicesCache, false) devices.push(...additionalDevices) } @@ -401,7 +407,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { devices.push({ user }) devices.push({ user: meUser }) - const additionalDevices = await getUSyncDevices([ meId, jid ], true) + const additionalDevices = await getUSyncDevices([ meId, jid ], useUserDevicesCache, true) devices.push(...additionalDevices) } diff --git a/src/Types/Message.ts b/src/Types/Message.ts index 441b8a4..dfb70a1 100644 --- a/src/Types/Message.ts +++ b/src/Types/Message.ts @@ -93,7 +93,7 @@ export type AnyMediaMessageContent = ( seconds?: number } | ({ sticker: WAMediaUpload - isAnimated?: boolean + isAnimated?: boolean } & WithDimensions) | ({ document: WAMediaUpload mimetype: string @@ -153,6 +153,8 @@ export type MessageRelayOptions = MinimalRelayOptions & { participant?: string /** additional attributes to add to the WA binary node */ additionalAttributes?: { [_: string]: string } + /** should we use the devices cache, or fetch afresh from the server; default assumed to be "true" */ + useUserDevicesCache?: boolean } export type MiscMessageGenerationOptions = MinimalRelayOptions & {