From 1f9cfb1cbaedca481375b10ef5ccb85ed5cf958b Mon Sep 17 00:00:00 2001 From: Rajeh Taher Date: Wed, 14 Aug 2024 12:07:27 +0300 Subject: [PATCH] PDO protocol (peer data operation): Get more history sync + better message retry mechanism (#919) * feat(feature/pdo-sync): initial commit * feat(feature/pdo-sync): Moved to conventional send functions, exported, patched some errors * fix(feature/pdo-sync): Linting and more bugsquatting * chore(feature/pdo-sync): linting done * feat/fix(feat/pdo-sync): Newsletter decrypt + ack * merge (#946) * fix: profilePictureUrl (#901) * Update module to latest version (#926) * Update package.json Update the module to the latest * Add files via upload * Fix: Readme use upsert events (#908) * Fix: getUSyncDevices (#862) * Update messages-send.ts * Update messages-send.ts * Update messages-send.ts * Fix lint * Fix lint * fix(master): update linting workflow to node 20 (current LTS) --------- Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com> Co-authored-by: Rizz2Dev Co-authored-by: Oscar Guindzberg Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com> * chore(feature/pdo-sync): final linting * fix(feature/pdo-sync): make replies optional * feat(feat/pdo-sync): add handle * feat(feature/pdo-sync): Fixed the issues with peer messages and implemented some more logic * fix(feature/pdo-sync): Make progress optional * fix(feature/pdo-sync): Nullify and defeat Message absent from node if it is resolved immediately * feat(feature/pdo-sync): Export message absent from node and export PDO request ID with it --------- Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com> Co-authored-by: Rizz2Dev Co-authored-by: Oscar Guindzberg Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com> --- Example/example.ts | 79 +++++++++++++++++-- src/Defaults/index.ts | 3 +- src/Socket/chats.ts | 13 +++- src/Socket/messages-recv.ts | 124 +++++++++++++++++++++++++++--- src/Socket/messages-send.ts | 61 ++++++++++++--- src/Store/make-in-memory-store.ts | 8 +- src/Types/Events.ts | 7 +- src/Types/Socket.ts | 2 + src/Utils/decode-wa-message.ts | 25 +++--- src/Utils/history.ts | 3 + src/Utils/process-message.ts | 49 ++++++++---- src/WABinary/jid-utils.ts | 2 + 12 files changed, 316 insertions(+), 60 deletions(-) diff --git a/Example/example.ts b/Example/example.ts index a78eb36..66ab302 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -1,16 +1,17 @@ import { Boom } from '@hapi/boom' import NodeCache from 'node-cache' import readline from 'readline' -import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src' -import MAIN_LOGGER from '../src/Utils/logger' +import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, downloadAndProcessHistorySyncNotification, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, getHistoryMsg, isJidNewsletter, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src' +//import MAIN_LOGGER from '../src/Utils/logger' import open from 'open' import fs from 'fs' +import P from 'pino' -const logger = MAIN_LOGGER.child({}) +const logger = P({ timestamp: () => `,"time":"${new Date().toJSON()}"` }, P.destination('./wa-logs.txt')) logger.level = 'trace' const useStore = !process.argv.includes('--no-store') -const doReplies = !process.argv.includes('--no-reply') +const doReplies = process.argv.includes('--do-reply') const usePairingCode = process.argv.includes('--use-pairing-code') const useMobile = process.argv.includes('--mobile') @@ -18,6 +19,8 @@ const useMobile = process.argv.includes('--mobile') // keep this out of the socket itself, so as to prevent a message decryption/encryption loop across socket restarts const msgRetryCounterCache = new NodeCache() +const onDemandMap = new Map() + // Read line interface const rl = readline.createInterface({ input: process.stdin, output: process.stdout }) const question = (text: string) => new Promise((resolve) => rl.question(text, resolve)) @@ -231,8 +234,11 @@ const startSock = async() => { // history received if(events['messaging-history.set']) { - const { chats, contacts, messages, isLatest } = events['messaging-history.set'] - console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})`) + const { chats, contacts, messages, isLatest, progress, syncType } = events['messaging-history.set'] + if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) { + console.log('received on-demand history sync, messages=', messages) + } + console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest}, progress: ${progress}%), type: ${syncType}`) } // received a new message @@ -241,8 +247,65 @@ const startSock = async() => { console.log('recv messages ', JSON.stringify(upsert, undefined, 2)) if(upsert.type === 'notify') { - for(const msg of upsert.messages) { - if(!msg.key.fromMe && doReplies) { + for (const msg of upsert.messages) { + //TODO: More built-in implementation of this + /* if ( + msg.message?.protocolMessage?.type === + proto.Message.ProtocolMessage.Type.HISTORY_SYNC_NOTIFICATION + ) { + const historySyncNotification = getHistoryMsg(msg.message) + if ( + historySyncNotification?.syncType == + proto.HistorySync.HistorySyncType.ON_DEMAND + ) { + const { messages } = + await downloadAndProcessHistorySyncNotification( + historySyncNotification, + {} + ) + + + const chatId = onDemandMap.get( + historySyncNotification!.peerDataRequestSessionId! + ) + + console.log(messages) + + onDemandMap.delete( + historySyncNotification!.peerDataRequestSessionId! + ) + + /* + // 50 messages is the limit imposed by whatsapp + //TODO: Add ratelimit of 7200 seconds + //TODO: Max retries 10 + const messageId = await sock.fetchMessageHistory( + 50, + oldestMessageKey, + oldestMessageTimestamp + ) + onDemandMap.set(messageId, chatId) + } + } */ + + if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) { + const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text + if (text == "requestPlaceholder" && !upsert.requestId) { + const messageId = await sock.requestPlaceholderResend(msg.key) + console.log('requested placeholder resync, id=', messageId) + } else if (upsert.requestId) { + console.log('Message received from phone, id=', upsert.requestId, msg) + } + + // go to an old chat and send this + if (text == "onDemandHistSync") { + const messageId = await sock.fetchMessageHistory(50, msg.key, msg.messageTimestamp!) + console.log('requested on-demand sync, id=', messageId) + } + } + + if(!msg.key.fromMe && doReplies && !isJidNewsletter(msg.key?.remoteJid!)) { + console.log('replying to', msg.key.remoteJid) await sock!.readMessages([msg.key]) await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!) diff --git a/src/Defaults/index.ts b/src/Defaults/index.ts index 564581f..72df30e 100644 --- a/src/Defaults/index.ts +++ b/src/Defaults/index.ts @@ -49,7 +49,8 @@ export const PROCESSABLE_HISTORY_TYPES = [ proto.Message.HistorySyncNotification.HistorySyncType.INITIAL_BOOTSTRAP, proto.Message.HistorySyncNotification.HistorySyncType.PUSH_NAME, proto.Message.HistorySyncNotification.HistorySyncType.RECENT, - proto.Message.HistorySyncNotification.HistorySyncType.FULL + proto.Message.HistorySyncNotification.HistorySyncType.FULL, + proto.Message.HistorySyncNotification.HistorySyncType.ON_DEMAND, ] export const DEFAULT_CONNECTION_CONFIG: SocketConfig = { diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index e9ee6df..bb315d0 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,6 +1,7 @@ import { Boom } from '@hapi/boom' +import NodeCache from 'node-cache' import { proto } from '../../WAProto' -import { PROCESSABLE_HISTORY_TYPES } from '../Defaults' +import { DEFAULT_CACHE_TTLS, PROCESSABLE_HISTORY_TYPES } from '../Defaults' import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence, WAPrivacyCallValue, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from '../Types' import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils' import { makeMutex } from '../Utils/make-mutex' @@ -36,6 +37,15 @@ export const makeChatsSocket = (config: SocketConfig) => { /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ const processingMutex = makeMutex() + const placeholderResendCache = config.placeholderResendCache || new NodeCache({ + stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour + useClones: false + }) + + if(!config.placeholderResendCache) { + config.placeholderResendCache = placeholderResendCache + } + /** helper function to fetch the given app state sync key */ const getAppStateSyncKey = async(keyId: string) => { const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId]) @@ -876,6 +886,7 @@ export const makeChatsSocket = (config: SocketConfig) => { msg, { shouldProcessHistoryMsg, + placeholderResendCache, ev, creds: authState.creds, keyStore: authState.keys, diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 6e64dd6..9507e01 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -10,6 +10,7 @@ import { aesEncryptGCM, Curve, decodeMediaRetryNode, + decodeMessageNode, decryptMessageNode, delay, derivePairingCodeKey, @@ -19,6 +20,7 @@ import { getHistoryMsg, getNextPreKeys, getStatusFromReceiptType, hkdf, + NO_MESSAGE_FOUND_ERROR_TEXT, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey @@ -65,6 +67,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { relayMessage, sendReceipt, uploadPreKeys, + sendPeerDataOperationMessage, } = sock /** this mutex ensures that each retryRequest will wait for the previous one to finish */ @@ -79,6 +82,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { useClones: false }) + const placeholderResendCache = config.placeholderResendCache || new NodeCache({ + stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour + useClones: false + }) + let sendActiveReceipts = false const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => { @@ -99,14 +107,13 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { stanza.attrs.recipient = attrs.recipient } + if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) { + stanza.attrs.type = attrs.type + } - if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) { - stanza.attrs.type = attrs.type - } - - if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) { - stanza.attrs.from = authState.creds.me!.id - } + if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) { + stanza.attrs.from = authState.creds.me!.id + } logger.debug({ recv: { tag, attrs }, sent: stanza.attrs }, 'sent ack') await sendNode(stanza) @@ -133,9 +140,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } const sendRetryRequest = async(node: BinaryNode, forceIncludeKeys = false) => { - const { id: msgId, participant } = node.attrs + const { fullMessage } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '') + const { key: msgKey } = fullMessage + const msgId = msgKey.id! - const key = `${msgId}:${participant}` + const key = `${msgId}:${msgKey?.participant}` let retryCount = msgRetryCache.get(key) || 0 if(retryCount >= maxMsgRetryCount) { logger.debug({ retryCount, msgId }, 'reached retry limit, clearing') @@ -148,6 +157,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const { account, signedPreKey, signedIdentityKey: identityKey } = authState.creds + if(retryCount === 1) { + //request a resend via phone + const msgId = await requestPlaceholderResend(msgKey) + logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`) + } + const deviceIdentity = encodeSignedDeviceIdentity(account!, true) await authState.keys.transaction( async() => { @@ -699,12 +714,30 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } const handleMessage = async(node: BinaryNode) => { - if(shouldIgnoreJid(node.attrs.from!) && node.attrs.from! !== '@s.whatsapp.net') { + if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') { logger.debug({ key: node.attrs.key }, 'ignored message') await sendMessageAck(node) return } + let response: string | undefined + + if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) { + await sendMessageAck(node) + const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage + response = await requestPlaceholderResend(key) + if(response === 'RESOLVED') { + return + } + + logger.debug('received unavailable message, acked and requested resend from phone') + } else { + if(placeholderResendCache.get(node.attrs.id)) { + placeholderResendCache.del(node.attrs.id) + } + } + + const { fullMessage: msg, category, author, decrypt } = decryptMessageNode( node, authState.creds.me!.id, @@ -713,6 +746,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { logger, ) + if(response && msg?.messageStubParameters?.[0] === NO_MESSAGE_FOUND_ERROR_TEXT) { + msg.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT, response] + } + if(msg.message?.protocolMessage?.type === proto.Message.ProtocolMessage.Type.SHARE_PHONE_NUMBER) { if(node.attrs.sender_pn) { ev.emit('chats.phoneNumberShare', { lid: node.attrs.from, jid: node.attrs.sender_pn }) @@ -728,6 +765,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { retryMutex.mutex( async() => { if(ws.isOpen) { + if(getBinaryNodeChild(node, 'unavailable')) { + return + } + const encNode = getBinaryNodeChild(node, 'enc') await sendRetryRequest(node, !encNode) if(retryRequestDelayMs) { @@ -773,6 +814,65 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ]) } + const fetchMessageHistory = async( + count: number, + oldestMsgKey: WAMessageKey, + oldestMsgTimestamp: number | Long + ): Promise => { + if(!authState.creds.me?.id) { + throw new Boom('Not authenticated') + } + + const pdoMessage = { + historySyncOnDemandRequest: { + chatJid: oldestMsgKey.remoteJid, + oldestMsgFromMe: oldestMsgKey.fromMe, + oldestMsgId: oldestMsgKey.id, + oldestMsgTimestampMs: oldestMsgTimestamp, + onDemandMsgCount: count + }, + peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.HISTORY_SYNC_ON_DEMAND + } + + return sendPeerDataOperationMessage(pdoMessage) + } + + const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise<'RESOLVED'| string | undefined> => { + if(!authState.creds.me?.id) { + throw new Boom('Not authenticated') + } + + if(placeholderResendCache.get(messageKey?.id!)) { + logger.debug('already requested resend', { messageKey }) + return + } else { + placeholderResendCache.set(messageKey?.id!, true) + } + + await delay(5000) + + if(!placeholderResendCache.get(messageKey?.id!)) { + logger.debug('message received while resend requested', { messageKey }) + return 'RESOLVED' + } + + const pdoMessage = { + placeholderMessageResendRequest: [{ + messageKey + }], + peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND + } + + setTimeout(() => { + if(placeholderResendCache.get(messageKey?.id!)) { + logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey }) + placeholderResendCache.del(messageKey?.id!) + } + }, 15_000) + + return sendPeerDataOperationMessage(pdoMessage) + } + const handleCall = async(node: BinaryNode) => { const { attrs } = node const [infoChild] = getAllBinaryNodeChildren(node) @@ -925,6 +1025,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ...sock, sendMessageAck, sendRetryRequest, - rejectCall + rejectCall, + fetchMessageHistory, + requestPlaceholderResend, } } diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index 8598482..96fc302 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -264,6 +264,34 @@ export const makeMessagesSocket = (config: SocketConfig) => { return didFetchNewSession } + const sendPeerDataOperationMessage = async( + pdoMessage: proto.Message.IPeerDataOperationRequestMessage + ): Promise => { + //TODO: for later, abstract the logic to send a Peer Message instead of just PDO - useful for App State Key Resync with phone + if(!authState.creds.me?.id) { + throw new Boom('Not authenticated') + } + + const protocolMessage: proto.IMessage = { + protocolMessage: { + peerDataOperationRequestMessage: pdoMessage, + type: proto.Message.ProtocolMessage.Type.PEER_DATA_OPERATION_REQUEST_MESSAGE + } + } + + const meJid = jidNormalizedUser(authState.creds.me.id)! + + const msgId = await relayMessage(meJid, protocolMessage, { + additionalAttributes: { + category: 'peer', + // eslint-disable-next-line camelcase + push_priority: 'high_force', + }, + }) + + return msgId + } + const createParticipantNodes = async( jids: string[], message: proto.IMessage, @@ -436,12 +464,15 @@ export const makeMessagesSocket = (config: SocketConfig) => { if(!participant) { devices.push({ user }) // do not send message to self if the device is 0 (mobile) - if(meDevice !== undefined && meDevice !== 0) { - devices.push({ user: meUser }) - } - const additionalDevices = await getUSyncDevices([ meId, jid ], !!useUserDevicesCache, true) - devices.push(...additionalDevices) + if(!(additionalAttributes?.['category'] === 'peer' && user === meUser)) { + if(meDevice !== undefined && meDevice !== 0) { + devices.push({ user: meUser }) + } + + const additionalDevices = await getUSyncDevices([ meId, jid ], !!useUserDevicesCache, true) + devices.push(...additionalDevices) + } } const allJids: string[] = [] @@ -475,11 +506,18 @@ export const makeMessagesSocket = (config: SocketConfig) => { } if(participants.length) { - binaryNodeContent.push({ - tag: 'participants', - attrs: { }, - content: participants - }) + if(additionalAttributes?.['category'] === 'peer') { + const peerNode = participants[0]?.content?.[0] as BinaryNode + if(peerNode) { + binaryNodeContent.push(peerNode) // push only enc + } + } else { + binaryNodeContent.push({ + tag: 'participants', + attrs: { }, + content: participants + }) + } } const stanza: BinaryNode = { @@ -606,8 +644,9 @@ export const makeMessagesSocket = (config: SocketConfig) => { sendReceipts, readMessages, refreshMediaConn, - waUploadToServer, + waUploadToServer, fetchPrivacySettings, + sendPeerDataOperationMessage, updateMediaMessage: async(message: proto.IWebMessageInfo) => { const content = assertMediaContent(message.message) const mediaKey = content.mediaKey! diff --git a/src/Store/make-in-memory-store.ts b/src/Store/make-in-memory-store.ts index 165fd2b..b96808c 100644 --- a/src/Store/make-in-memory-store.ts +++ b/src/Store/make-in-memory-store.ts @@ -93,8 +93,14 @@ export default (config: BaileysInMemoryStoreConfig) => { chats: newChats, contacts: newContacts, messages: newMessages, - isLatest + isLatest, + syncType }) => { + if(syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) { + return // FOR NOW, + //TODO: HANDLE + } + if(isLatest) { chats.clear() diff --git a/src/Types/Events.ts b/src/Types/Events.ts index e10aad7..35bed25 100644 --- a/src/Types/Events.ts +++ b/src/Types/Events.ts @@ -20,7 +20,9 @@ export type BaileysEventMap = { chats: Chat[] contacts: Contact[] messages: WAMessage[] - isLatest: boolean + isLatest?: boolean + progress?: number | null + syncType?: proto.HistorySync.HistorySyncType } /** upsert chats */ 'chats.upsert': Chat[] @@ -41,8 +43,9 @@ export type BaileysEventMap = { /** * add/update the given messages. If they were received while the connection was online, * the update will have type: "notify" + * if requestId is provided, then the messages was received from the phone due to it being unavailable * */ - 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType } + 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType, requestId?: string } /** message was reacted to. If reaction was removed -- then "reaction.text" will be falsey */ 'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction }[] diff --git a/src/Types/Socket.ts b/src/Types/Socket.ts index ab41b4a..4d6e1c7 100644 --- a/src/Types/Socket.ts +++ b/src/Types/Socket.ts @@ -74,6 +74,8 @@ export type SocketConfig = { userDevicesCache?: CacheStore /** cache to store call offers */ callOfferCache?: CacheStore + /** cache to track placeholder resends */ + placeholderResendCache?: CacheStore /** width for link preview images */ linkPreviewImageThumbnailWidth: number /** Should Baileys ask the phone for full history, will be received async */ diff --git a/src/Utils/decode-wa-message.ts b/src/Utils/decode-wa-message.ts index 3760865..4d7e9b6 100644 --- a/src/Utils/decode-wa-message.ts +++ b/src/Utils/decode-wa-message.ts @@ -2,12 +2,12 @@ import { Boom } from '@hapi/boom' import { Logger } from 'pino' import { proto } from '../../WAProto' import { SignalRepository, WAMessageKey } from '../Types' -import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidStatusBroadcast, isJidUser, isLidUser } from '../WABinary' -import { BufferJSON, unpadRandomMax16 } from './generics' +import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidNewsletter, isJidStatusBroadcast, isJidUser, isLidUser } from '../WABinary' +import { unpadRandomMax16 } from './generics' -const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node' +export const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node' -type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' +type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' | 'newsletter' /** * Decode the received node as a message. @@ -78,12 +78,16 @@ export function decodeMessageNode( chatId = from author = participant + } else if(isJidNewsletter(from)) { + msgType = 'newsletter' + chatId = from + author = from } else { throw new Boom('Unknown message type', { data: stanza }) } const fromMe = (isLidUser(from) ? isMeLid : isMe)(stanza.attrs.participant || stanza.attrs.from) - const pushname = stanza.attrs.notify + const pushname = stanza?.attrs?.notify const key: WAMessageKey = { remoteJid: chatId, @@ -132,7 +136,7 @@ export const decryptMessageNode = ( fullMessage.verifiedBizName = details.verifiedName } - if(tag !== 'enc') { + if(tag !== 'enc' && tag !== 'plaintext') { continue } @@ -145,7 +149,7 @@ export const decryptMessageNode = ( let msgBuffer: Uint8Array try { - const e2eType = attrs.type + const e2eType = tag === 'plaintext' ? 'plaintext' : attrs.type switch (e2eType) { case 'skmsg': msgBuffer = await repository.decryptGroupMessage({ @@ -163,11 +167,14 @@ export const decryptMessageNode = ( ciphertext: content }) break + case 'plaintext': + msgBuffer = content + break default: throw new Error(`Unknown e2e type: ${e2eType}`) } - let msg: proto.IMessage = proto.Message.decode(unpadRandomMax16(msgBuffer)) + let msg: proto.IMessage = proto.Message.decode(e2eType !== 'plaintext' ? unpadRandomMax16(msgBuffer) : msgBuffer) msg = msg.deviceSentMessage?.message || msg if(msg.senderKeyDistributionMessage) { try { @@ -199,7 +206,7 @@ export const decryptMessageNode = ( // if nothing was found to decrypt if(!decryptables) { fullMessage.messageStubType = proto.WebMessageInfo.StubType.CIPHERTEXT - fullMessage.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT, JSON.stringify(stanza, BufferJSON.replacer)] + fullMessage.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT] } } } diff --git a/src/Utils/history.ts b/src/Utils/history.ts index b56a395..ad3c693 100644 --- a/src/Utils/history.ts +++ b/src/Utils/history.ts @@ -38,6 +38,7 @@ export const processHistoryMessage = (item: proto.IHistorySync) => { case proto.HistorySync.HistorySyncType.INITIAL_BOOTSTRAP: case proto.HistorySync.HistorySyncType.RECENT: case proto.HistorySync.HistorySyncType.FULL: + case proto.HistorySync.HistorySyncType.ON_DEMAND: for(const chat of item.conversations! as Chat[]) { contacts.push({ id: chat.id, name: chat.name || undefined }) @@ -93,6 +94,8 @@ export const processHistoryMessage = (item: proto.IHistorySync) => { chats, contacts, messages, + syncType: item.syncType, + progress: item.progress, } } diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index 49f372f..58714e9 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -1,7 +1,7 @@ import { AxiosRequestConfig } from 'axios' import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AuthenticationCreds, BaileysEventEmitter, Chat, GroupMetadata, ParticipantAction, RequestJoinAction, RequestJoinMethod, SignalKeyStoreWithTransaction, SocketConfig, WAMessageStubType } from '../Types' +import { AuthenticationCreds, BaileysEventEmitter, CacheStore, Chat, GroupMetadata, ParticipantAction, RequestJoinAction, RequestJoinMethod, SignalKeyStoreWithTransaction, SocketConfig, WAMessageStubType } from '../Types' import { getContentType, normalizeMessageContent } from '../Utils/messages' import { areJidsSameUser, isJidBroadcast, isJidStatusBroadcast, jidNormalizedUser } from '../WABinary' import { aesDecryptGCM, hmacSign } from './crypto' @@ -10,6 +10,7 @@ import { downloadAndProcessHistorySyncNotification } from './history' type ProcessMessageContext = { shouldProcessHistoryMsg: boolean + placeholderResendCache?: CacheStore creds: AuthenticationCreds keyStore: SignalKeyStoreWithTransaction ev: BaileysEventEmitter @@ -33,7 +34,7 @@ const REAL_MSG_REQ_ME_STUB_TYPES = new Set([ export const cleanMessage = (message: proto.IWebMessageInfo, meId: string) => { // ensure remoteJid and participant doesn't have device or agent in it message.key.remoteJid = jidNormalizedUser(message.key.remoteJid!) - message.key.participant = message.key.participant ? jidNormalizedUser(message.key.participant!) : undefined + message.key.participant = message.key.participant ? jidNormalizedUser(message.key.participant) : undefined const content = normalizeMessageContent(message.message) // if the message has a reaction, ensure fromMe & remoteJid are from our perspective if(content?.reactionMessage) { @@ -152,6 +153,7 @@ const processMessage = async( message: proto.IWebMessageInfo, { shouldProcessHistoryMsg, + placeholderResendCache, ev, creds, keyStore, @@ -190,7 +192,7 @@ const processMessage = async( if(protocolMsg) { switch (protocolMsg.type) { case proto.Message.ProtocolMessage.Type.HISTORY_SYNC_NOTIFICATION: - const histNotification = protocolMsg!.historySyncNotification! + const histNotification = protocolMsg.historySyncNotification! const process = shouldProcessHistoryMsg const isLatest = !creds.processedHistoryMessages?.length @@ -202,19 +204,27 @@ const processMessage = async( }, 'got history notification') if(process) { - ev.emit('creds.update', { - processedHistoryMessages: [ - ...(creds.processedHistoryMessages || []), - { key: message.key, messageTimestamp: message.messageTimestamp } - ] - }) + if(histNotification.syncType !== proto.HistorySync.HistorySyncType.ON_DEMAND) { + ev.emit('creds.update', { + processedHistoryMessages: [ + ...(creds.processedHistoryMessages || []), + { key: message.key, messageTimestamp: message.messageTimestamp } + ] + }) + } const data = await downloadAndProcessHistorySyncNotification( histNotification, options ) - ev.emit('messaging-history.set', { ...data, isLatest }) + ev.emit('messaging-history.set', { + ...data, + isLatest: + histNotification.syncType !== proto.HistorySync.HistorySyncType.ON_DEMAND + ? isLatest + : undefined + }) } break @@ -267,14 +277,21 @@ const processMessage = async( case proto.Message.ProtocolMessage.Type.PEER_DATA_OPERATION_REQUEST_RESPONSE_MESSAGE: const response = protocolMsg.peerDataOperationRequestResponseMessage! if(response) { + placeholderResendCache?.del(response.stanzaId!) + // TODO: IMPLEMENT HISTORY SYNC ETC (sticker uploads etc.). const { peerDataOperationResult } = response for(const result of peerDataOperationResult!) { const { placeholderMessageResendResponse: retryResponse } = result if(retryResponse) { const webMessageInfo = proto.WebMessageInfo.decode(retryResponse.webMessageInfoBytes!) - ev.emit('messages.update', [ - { key: webMessageInfo.key, update: { message: webMessageInfo.message } } - ]) + // wait till another upsert event is available, don't want it to be part of the PDO response message + setTimeout(() => { + ev.emit('messages.upsert', { + messages: [webMessageInfo], + type: 'notify', + requestId: response.stanzaId! + }) + }, 500) } } } @@ -288,10 +305,10 @@ const processMessage = async( } ev.emit('messages.reaction', [{ reaction, - key: content.reactionMessage!.key!, + key: content.reactionMessage?.key!, }]) } else if(message.messageStubType) { - const jid = message.key!.remoteJid! + const jid = message.key?.remoteJid! //let actor = whatsappID (message.participant) let participants: string[] const emitParticipantsUpdate = (action: ParticipantAction) => ( @@ -380,7 +397,7 @@ const processMessage = async( if(pollMsg) { const meIdNormalised = jidNormalizedUser(meId) const pollCreatorJid = getKeyAuthor(creationMsgKey, meIdNormalised) - const voterJid = getKeyAuthor(message.key!, meIdNormalised) + const voterJid = getKeyAuthor(message.key, meIdNormalised) const pollEncKey = pollMsg.messageContextInfo?.messageSecret! try { diff --git a/src/WABinary/jid-utils.ts b/src/WABinary/jid-utils.ts index cb58eef..984c1f6 100644 --- a/src/WABinary/jid-utils.ts +++ b/src/WABinary/jid-utils.ts @@ -54,6 +54,8 @@ export const isJidBroadcast = (jid: string | undefined) => (jid?.endsWith('@broa export const isJidGroup = (jid: string | undefined) => (jid?.endsWith('@g.us')) /** is the jid the status broadcast */ export const isJidStatusBroadcast = (jid: string) => jid === 'status@broadcast' +/** is the jid a newsletter */ +export const isJidNewsletter = (jid: string | undefined) => (jid?.endsWith('@newsletter')) export const jidNormalizedUser = (jid: string | undefined) => { const result = jidDecode(jid)