mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
PDO protocol (peer data operation): Get more history sync + better message retry mechanism (#919)
* feat(feature/pdo-sync): initial commit * feat(feature/pdo-sync): Moved to conventional send functions, exported, patched some errors * fix(feature/pdo-sync): Linting and more bugsquatting * chore(feature/pdo-sync): linting done * feat/fix(feat/pdo-sync): Newsletter decrypt + ack * merge (#946) * fix: profilePictureUrl (#901) * Update module to latest version (#926) * Update package.json Update the module to the latest * Add files via upload * Fix: Readme use upsert events (#908) * Fix: getUSyncDevices (#862) * Update messages-send.ts * Update messages-send.ts * Update messages-send.ts * Fix lint * Fix lint * fix(master): update linting workflow to node 20 (current LTS) --------- Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com> Co-authored-by: Rizz2Dev <muhamad.rizki27483@smp.belajar.id> Co-authored-by: Oscar Guindzberg <oscar.guindzberg@gmail.com> Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com> * chore(feature/pdo-sync): final linting * fix(feature/pdo-sync): make replies optional * feat(feat/pdo-sync): add <unavailable> handle * feat(feature/pdo-sync): Fixed the issues with peer messages and implemented some more logic * fix(feature/pdo-sync): Make progress optional * fix(feature/pdo-sync): Nullify and defeat Message absent from node if it is resolved immediately * feat(feature/pdo-sync): Export message absent from node and export PDO request ID with it --------- Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com> Co-authored-by: Rizz2Dev <muhamad.rizki27483@smp.belajar.id> Co-authored-by: Oscar Guindzberg <oscar.guindzberg@gmail.com> Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import {
|
||||
aesEncryptGCM,
|
||||
Curve,
|
||||
decodeMediaRetryNode,
|
||||
decodeMessageNode,
|
||||
decryptMessageNode,
|
||||
delay,
|
||||
derivePairingCodeKey,
|
||||
@@ -19,6 +20,7 @@ import {
|
||||
getHistoryMsg,
|
||||
getNextPreKeys,
|
||||
getStatusFromReceiptType, hkdf,
|
||||
NO_MESSAGE_FOUND_ERROR_TEXT,
|
||||
unixTimestampSeconds,
|
||||
xmppPreKey,
|
||||
xmppSignedPreKey
|
||||
@@ -65,6 +67,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
relayMessage,
|
||||
sendReceipt,
|
||||
uploadPreKeys,
|
||||
sendPeerDataOperationMessage,
|
||||
} = sock
|
||||
|
||||
/** this mutex ensures that each retryRequest will wait for the previous one to finish */
|
||||
@@ -79,6 +82,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
useClones: false
|
||||
})
|
||||
|
||||
const placeholderResendCache = config.placeholderResendCache || new NodeCache({
|
||||
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
|
||||
useClones: false
|
||||
})
|
||||
|
||||
let sendActiveReceipts = false
|
||||
|
||||
const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => {
|
||||
@@ -99,14 +107,13 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
stanza.attrs.recipient = attrs.recipient
|
||||
}
|
||||
|
||||
if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
|
||||
stanza.attrs.type = attrs.type
|
||||
}
|
||||
|
||||
if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
|
||||
stanza.attrs.type = attrs.type
|
||||
}
|
||||
|
||||
if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
|
||||
stanza.attrs.from = authState.creds.me!.id
|
||||
}
|
||||
if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
|
||||
stanza.attrs.from = authState.creds.me!.id
|
||||
}
|
||||
|
||||
logger.debug({ recv: { tag, attrs }, sent: stanza.attrs }, 'sent ack')
|
||||
await sendNode(stanza)
|
||||
@@ -133,9 +140,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
}
|
||||
|
||||
const sendRetryRequest = async(node: BinaryNode, forceIncludeKeys = false) => {
|
||||
const { id: msgId, participant } = node.attrs
|
||||
const { fullMessage } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '')
|
||||
const { key: msgKey } = fullMessage
|
||||
const msgId = msgKey.id!
|
||||
|
||||
const key = `${msgId}:${participant}`
|
||||
const key = `${msgId}:${msgKey?.participant}`
|
||||
let retryCount = msgRetryCache.get<number>(key) || 0
|
||||
if(retryCount >= maxMsgRetryCount) {
|
||||
logger.debug({ retryCount, msgId }, 'reached retry limit, clearing')
|
||||
@@ -148,6 +157,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
|
||||
const { account, signedPreKey, signedIdentityKey: identityKey } = authState.creds
|
||||
|
||||
if(retryCount === 1) {
|
||||
//request a resend via phone
|
||||
const msgId = await requestPlaceholderResend(msgKey)
|
||||
logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`)
|
||||
}
|
||||
|
||||
const deviceIdentity = encodeSignedDeviceIdentity(account!, true)
|
||||
await authState.keys.transaction(
|
||||
async() => {
|
||||
@@ -699,12 +714,30 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
}
|
||||
|
||||
const handleMessage = async(node: BinaryNode) => {
|
||||
if(shouldIgnoreJid(node.attrs.from!) && node.attrs.from! !== '@s.whatsapp.net') {
|
||||
if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
|
||||
logger.debug({ key: node.attrs.key }, 'ignored message')
|
||||
await sendMessageAck(node)
|
||||
return
|
||||
}
|
||||
|
||||
let response: string | undefined
|
||||
|
||||
if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) {
|
||||
await sendMessageAck(node)
|
||||
const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage
|
||||
response = await requestPlaceholderResend(key)
|
||||
if(response === 'RESOLVED') {
|
||||
return
|
||||
}
|
||||
|
||||
logger.debug('received unavailable message, acked and requested resend from phone')
|
||||
} else {
|
||||
if(placeholderResendCache.get(node.attrs.id)) {
|
||||
placeholderResendCache.del(node.attrs.id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const { fullMessage: msg, category, author, decrypt } = decryptMessageNode(
|
||||
node,
|
||||
authState.creds.me!.id,
|
||||
@@ -713,6 +746,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
logger,
|
||||
)
|
||||
|
||||
if(response && msg?.messageStubParameters?.[0] === NO_MESSAGE_FOUND_ERROR_TEXT) {
|
||||
msg.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT, response]
|
||||
}
|
||||
|
||||
if(msg.message?.protocolMessage?.type === proto.Message.ProtocolMessage.Type.SHARE_PHONE_NUMBER) {
|
||||
if(node.attrs.sender_pn) {
|
||||
ev.emit('chats.phoneNumberShare', { lid: node.attrs.from, jid: node.attrs.sender_pn })
|
||||
@@ -728,6 +765,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
retryMutex.mutex(
|
||||
async() => {
|
||||
if(ws.isOpen) {
|
||||
if(getBinaryNodeChild(node, 'unavailable')) {
|
||||
return
|
||||
}
|
||||
|
||||
const encNode = getBinaryNodeChild(node, 'enc')
|
||||
await sendRetryRequest(node, !encNode)
|
||||
if(retryRequestDelayMs) {
|
||||
@@ -773,6 +814,65 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
])
|
||||
}
|
||||
|
||||
const fetchMessageHistory = async(
|
||||
count: number,
|
||||
oldestMsgKey: WAMessageKey,
|
||||
oldestMsgTimestamp: number | Long
|
||||
): Promise<string> => {
|
||||
if(!authState.creds.me?.id) {
|
||||
throw new Boom('Not authenticated')
|
||||
}
|
||||
|
||||
const pdoMessage = {
|
||||
historySyncOnDemandRequest: {
|
||||
chatJid: oldestMsgKey.remoteJid,
|
||||
oldestMsgFromMe: oldestMsgKey.fromMe,
|
||||
oldestMsgId: oldestMsgKey.id,
|
||||
oldestMsgTimestampMs: oldestMsgTimestamp,
|
||||
onDemandMsgCount: count
|
||||
},
|
||||
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.HISTORY_SYNC_ON_DEMAND
|
||||
}
|
||||
|
||||
return sendPeerDataOperationMessage(pdoMessage)
|
||||
}
|
||||
|
||||
const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise<'RESOLVED'| string | undefined> => {
|
||||
if(!authState.creds.me?.id) {
|
||||
throw new Boom('Not authenticated')
|
||||
}
|
||||
|
||||
if(placeholderResendCache.get(messageKey?.id!)) {
|
||||
logger.debug('already requested resend', { messageKey })
|
||||
return
|
||||
} else {
|
||||
placeholderResendCache.set(messageKey?.id!, true)
|
||||
}
|
||||
|
||||
await delay(5000)
|
||||
|
||||
if(!placeholderResendCache.get(messageKey?.id!)) {
|
||||
logger.debug('message received while resend requested', { messageKey })
|
||||
return 'RESOLVED'
|
||||
}
|
||||
|
||||
const pdoMessage = {
|
||||
placeholderMessageResendRequest: [{
|
||||
messageKey
|
||||
}],
|
||||
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
if(placeholderResendCache.get(messageKey?.id!)) {
|
||||
logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey })
|
||||
placeholderResendCache.del(messageKey?.id!)
|
||||
}
|
||||
}, 15_000)
|
||||
|
||||
return sendPeerDataOperationMessage(pdoMessage)
|
||||
}
|
||||
|
||||
const handleCall = async(node: BinaryNode) => {
|
||||
const { attrs } = node
|
||||
const [infoChild] = getAllBinaryNodeChildren(node)
|
||||
@@ -925,6 +1025,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
...sock,
|
||||
sendMessageAck,
|
||||
sendRetryRequest,
|
||||
rejectCall
|
||||
rejectCall,
|
||||
fetchMessageHistory,
|
||||
requestPlaceholderResend,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user