diff --git a/Example/example.ts b/Example/example.ts index 5548fdb..4ac0e47 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -75,6 +75,7 @@ const startSock = async() => { sock.ev.on('messages.update', m => console.log(m)) sock.ev.on('message-receipt.update', m => console.log(m)) + sock.ev.on('messages.reaction', m => console.log(m)) sock.ev.on('presence.update', m => console.log(m)) sock.ev.on('chats.update', m => console.log(m)) sock.ev.on('chats.delete', m => console.log(m)) diff --git a/README.md b/README.md index a128ce1..e52df36 100644 --- a/README.md +++ b/README.md @@ -187,17 +187,17 @@ The events are typed up in a type map, as mentioned here: ``` ts -export type BaileysEventMap = { +export type BaileysEventMap = { /** connection state has been updated -- WS closed, opened, connecting etc. */ - 'connection.update': Partial - /** auth credentials updated -- some pre key state, device ID etc. */ - 'creds.update': Partial + 'connection.update': Partial + /** credentials updated -- some metadata, keys or something */ + 'creds.update': Partial /** set chats (history sync), chats are reverse chronologically sorted */ 'chats.set': { chats: Chat[], isLatest: boolean } /** set messages (history sync), messages are reverse chronologically sorted */ 'messages.set': { messages: WAMessage[], isLatest: boolean } /** set contacts (history sync) */ - 'contacts.set': { contacts: Contact[] } + 'contacts.set': { contacts: Contact[], isLatest: boolean } /** upsert chats */ 'chats.upsert': Chat[] /** update the given chats */ @@ -205,21 +205,25 @@ export type BaileysEventMap = { /** delete chats with given ID */ 'chats.delete': string[] /** presence of contact in a chat updated */ - 'presence.update': { id: string, presences: { [participant: string]: PresenceData } } + 'presence.update': { id: string, presences: { [participant: string]: PresenceData } } 'contacts.upsert': Contact[] - 'contacts.update': Partial[] - + 'contacts.update': Partial[] + 'messages.delete': { keys: WAMessageKey[] } | { jid: string, all: true } 'messages.update': WAMessageUpdate[] - /** - * add/update the given messages. If they were received while the connection was online, + 'messages.media-update': { key: WAMessageKey, media?: { ciphertext: Uint8Array, iv: Uint8Array }, error?: Boom }[] + /** + * add/update the given messages. If they were received while the connection was online, * the update will have type: "notify" * */ - 'messages.upsert': { messages: WAMessage[], type: MessageUpdateType } + 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType } + /** message was reacted to. If reaction was removed -- then "reaction.text" will be falsey */ + 'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction }[] - 'message-info.update': MessageInfoUpdate[] + 'message-receipt.update': MessageUserReceiptUpdate[] + 'groups.upsert': GroupMetadata[] 'groups.update': Partial[] /** apply an action to participants in a group */ 'group-participants.update': { id: string, participants: string[], action: ParticipantAction } diff --git a/src/LegacySocket/messages.ts b/src/LegacySocket/messages.ts index 830ffb2..d62ca95 100644 --- a/src/LegacySocket/messages.ts +++ b/src/LegacySocket/messages.ts @@ -1,6 +1,6 @@ import { proto } from '../../WAProto' import { WA_DEFAULT_EPHEMERAL } from '../Defaults' -import { AnyMessageContent, Chat, GroupMetadata, LegacySocketConfig, MediaConnInfo, MessageUpdateType, MessageUserReceipt, MessageUserReceiptUpdate, MiscMessageGenerationOptions, ParticipantAction, WAFlag, WAMessage, WAMessageCursor, WAMessageKey, WAMessageStatus, WAMessageStubType, WAMessageUpdate, WAMetric, WAUrlInfo } from '../Types' +import { AnyMessageContent, Chat, GroupMetadata, LegacySocketConfig, MediaConnInfo, MessageUpsertType, MessageUserReceipt, MessageUserReceiptUpdate, MiscMessageGenerationOptions, ParticipantAction, WAFlag, WAMessage, WAMessageCursor, WAMessageKey, WAMessageStatus, WAMessageStubType, WAMessageUpdate, WAMetric, WAUrlInfo } from '../Types' import { assertMediaContent, downloadMediaMessage, generateWAMessage, getWAUploadToServer, MediaDownloadOptions, normalizeMessageContent, toNumber } from '../Utils' import { areJidsSameUser, BinaryNode, getBinaryNodeMessages, isJidGroup, jidNormalizedUser } from '../WABinary' import makeChatsSocket from './chats' @@ -97,12 +97,12 @@ const makeMessagesSocket = (config: LegacySocketConfig) => { const attrs = response.attrs Object.assign(content, attrs) // update message - ev.emit('messages.upsert', { messages: [message], type: 'replace' }) + ev.emit('messages.update', [{ key: message.key, update: { message: message.message } }]) return message } - const onMessage = (message: WAMessage, type: MessageUpdateType) => { + const onMessage = (message: WAMessage, type: MessageUpsertType) => { const jid = message.key.remoteJid! // store chat updates in this const chatUpdate: Partial = { @@ -145,10 +145,9 @@ const makeMessagesSocket = (config: LegacySocketConfig) => { ...normalizedContent.reactionMessage, key: message.key, } - const operation = normalizedContent.reactionMessage?.text ? 'add' : 'remove' ev.emit( 'messages.reaction', - { reaction, key: normalizedContent.reactionMessage!.key!, operation } + [{ reaction, key: normalizedContent.reactionMessage!.key! }] ) } @@ -335,7 +334,7 @@ const makeMessagesSocket = (config: LegacySocketConfig) => { socketEvents.on ('CB:action,add:update,message', (node: BinaryNode) => { const msgs = getBinaryNodeMessages(node) for(const msg of msgs) { - onMessage(msg, 'replace') + onMessage(msg, 'append') } }) // message status updates diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index f6ecfcd..dec3838 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,16 +1,19 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' -import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' -import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils' +import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' +import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils' import { makeMutex } from '../Utils/make-mutex' +import processMessage from '../Utils/process-message' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' -import { makeMessagesSocket } from './messages-send' +import { makeSocket } from './socket' const MAX_SYNC_ATTEMPTS = 5 +const APP_STATE_SYNC_TIMEOUT_MS = 10_000 + export const makeChatsSocket = (config: SocketConfig) => { - const { logger, markOnlineOnConnect } = config - const sock = makeMessagesSocket(config) + const { logger, markOnlineOnConnect, treatCiphertextMessagesAsReal, downloadHistory } = config + const sock = makeSocket(config) const { ev, ws, @@ -18,18 +21,60 @@ export const makeChatsSocket = (config: SocketConfig) => { generateMessageTag, sendNode, query, - fetchPrivacySettings, onUnexpectedError, emitEventsFromMap, } = sock + let privacySettings: { [_: string]: string } | undefined + const mutationMutex = makeMutex() + /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ + const processingMutex = makeMutex() + /** cache to ensure new history sync events do not have duplicate items */ + const historyCache = new Set() + let recvChats: InitialReceivedChatsState = { } + + const appStateSyncTimeout = debouncedTimeout( + APP_STATE_SYNC_TIMEOUT_MS, + async() => { + logger.info( + { recvChats: Object.keys(recvChats).length }, + 'doing initial app state sync' + ) + if(ws.readyState === ws.OPEN) { + await resyncMainAppState(recvChats) + } + + historyCache.clear() + recvChats = { } + } + ) + /** helper function to fetch the given app state sync key */ const getAppStateSyncKey = async(keyId: string) => { const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId]) return key } + const fetchPrivacySettings = async(force: boolean = false) => { + if(!privacySettings || force) { + const { content } = await query({ + tag: 'iq', + attrs: { + xmlns: 'privacy', + to: S_WHATSAPP_NET, + type: 'get' + }, + content: [ + { tag: 'privacy', attrs: { } } + ] + }) + privacySettings = reduceBinaryNodeToDictionary(content[0] as BinaryNode, 'category') + } + + return privacySettings + } + /** helper function to run a generic IQ query */ const interactiveQuery = async(userNodes: BinaryNode[], queryNode: BinaryNode) => { const result = await query({ @@ -639,6 +684,53 @@ export const makeChatsSocket = (config: SocketConfig) => { ]) } + const processMessageLocal = async(msg: proto.IWebMessageInfo) => { + // process message and emit events + const newEvents = await processMessage( + msg, + { + downloadHistory, + historyCache, + recvChats, + creds: authState.creds, + keyStore: authState.keys, + logger, + treatCiphertextMessagesAsReal + } + ) + + const isAnyHistoryMsg = isHistoryMsg(msg.message) + if(isAnyHistoryMsg) { + // we only want to sync app state once we've all the history + // restart the app state sync timeout + logger.debug('restarting app sync timeout') + appStateSyncTimeout.start() + } + + return newEvents + } + + const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => { + ev.emit('messages.upsert', { messages: [msg], type }) + + if(!!msg.pushName) { + let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid) + jid = jidNormalizedUser(jid) + + if(!msg.key.fromMe) { + ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }]) + } + + // update our pushname too + if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) { + ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } }) + } + } + + const events = await processMessageLocal(msg) + emitEventsFromMap(events) + } + ws.on('CB:presence', handlePresenceUpdate) ws.on('CB:chatstate', handlePresenceUpdate) @@ -664,17 +756,6 @@ export const makeChatsSocket = (config: SocketConfig) => { } }) - ws.on('CB:notification,type:server_sync', (node: BinaryNode) => { - const update = getBinaryNodeChild(node, 'collection') - if(update) { - const name = update.attrs.name as WAPatchName - mutationMutex.mutex(() => ( - resyncAppState([name], undefined) - .catch(err => logger.error({ trace: err.stack, node }, 'failed to sync state')) - )) - } - }) - ev.on('connection.update', ({ connection }) => { if(connection === 'open') { fireInitQueries() @@ -686,6 +767,10 @@ export const makeChatsSocket = (config: SocketConfig) => { return { ...sock, + mutationMutex, + processingMutex, + fetchPrivacySettings, + upsertMessage, appPatch, sendPresenceUpdate, presenceSubscribe, diff --git a/src/Socket/groups.ts b/src/Socket/groups.ts index 25ef9e4..87506bc 100644 --- a/src/Socket/groups.ts +++ b/src/Socket/groups.ts @@ -2,11 +2,11 @@ import { proto } from '../../WAProto' import { GroupMetadata, ParticipantAction, SocketConfig, WAMessageKey, WAMessageStubType } from '../Types' import { generateMessageID, unixTimestampSeconds } from '../Utils' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidEncode, jidNormalizedUser } from '../WABinary' -import { makeSocket } from './socket' +import { makeChatsSocket } from './chats' export const makeGroupsSocket = (config: SocketConfig) => { - const sock = makeSocket(config) - const { authState, ev, query } = sock + const sock = makeChatsSocket(config) + const { authState, ev, query, upsertMessage } = sock const groupQuery = async(jid: string, type: 'get' | 'set', content: BinaryNode[]) => ( query({ @@ -175,25 +175,23 @@ export const makeGroupsSocket = (config: SocketConfig) => { } // generate the group add message - ev.emit('messages.upsert', { - messages: [ - { - key: { - remoteJid: inviteMessage.groupJid, - id: generateMessageID(), - fromMe: false, - participant: key.remoteJid, - }, - messageStubType: WAMessageStubType.GROUP_PARTICIPANT_ADD, - messageStubParameters: [ - authState.creds.me!.id - ], + await upsertMessage( + { + key: { + remoteJid: inviteMessage.groupJid, + id: generateMessageID(), + fromMe: false, participant: key.remoteJid, - messageTimestamp: unixTimestampSeconds() - } - ], - type: 'notify' - }) + }, + messageStubType: WAMessageStubType.GROUP_PARTICIPANT_ADD, + messageStubParameters: [ + authState.creds.me!.id + ], + participant: key.remoteJid, + messageTimestamp: unixTimestampSeconds() + }, + 'notify' + ) return results.attrs.from }, diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index ce82a59..278be3f 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -1,65 +1,40 @@ import { proto } from '../../WAProto' import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults' -import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType } from '../Types' -import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils' -import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex' -import processMessage, { cleanMessage } from '../Utils/process-message' +import { MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType, WAPatchName } from '../Types' +import { decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, isHistoryMsg, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils' +import { makeMutex } from '../Utils/make-mutex' +import { cleanMessage } from '../Utils/process-message' import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getAllBinaryNodeChildren, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary' -import { makeChatsSocket } from './chats' import { extractGroupMetadata } from './groups' - -const APP_STATE_SYNC_TIMEOUT_MS = 10_000 +import { makeMessagesSocket } from './messages-send' export const makeMessagesRecvSocket = (config: SocketConfig) => { const { logger, - treatCiphertextMessagesAsReal, retryRequestDelayMs, - downloadHistory, getMessage } = config - const sock = makeChatsSocket(config) + const sock = makeMessagesSocket(config) const { ev, authState, ws, + mutationMutex, + processingMutex, + upsertMessage, + resyncAppState, onUnexpectedError, assertSessions, sendNode, relayMessage, sendReceipt, - resyncMainAppState, - emitEventsFromMap, uploadPreKeys, } = sock - /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ - const processingMutex = makeKeyedMutex() - /** this mutex ensures that each retryRequest will wait for the previous one to finish */ const retryMutex = makeMutex() - /** cache to ensure new history sync events do not have duplicate items */ - const historyCache = new Set() - let recvChats: InitialReceivedChatsState = { } - - const appStateSyncTimeout = debouncedTimeout( - APP_STATE_SYNC_TIMEOUT_MS, - async() => { - logger.info( - { recvChats: Object.keys(recvChats).length }, - 'doing initial app state sync' - ) - if(ws.readyState === ws.OPEN) { - await resyncMainAppState(recvChats) - } - - historyCache.clear() - recvChats = { } - } - ) - const msgRetryMap = config.msgRetryCounterMap || { } const callOfferData: { [id: string]: WACallEvent } = { } @@ -169,37 +144,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ) } - const processMessageLocal = async(msg: proto.IWebMessageInfo) => { - // process message and emit events - const newEvents = await processMessage( - msg, - { - downloadHistory, - historyCache, - recvChats, - creds: authState.creds, - keyStore: authState.keys, - logger, - treatCiphertextMessagesAsReal - } - ) - - // send ack for history message - const normalizedContent = !!msg.message ? normalizeMessageContent(msg.message) : undefined - const isAnyHistoryMsg = !!normalizedContent?.protocolMessage?.historySyncNotification - if(isAnyHistoryMsg) { - // we only want to sync app state once we've all the history - // restart the app state sync timeout - logger.debug('restarting app sync timeout') - appStateSyncTimeout.start() - - const jid = jidEncode(jidDecode(msg.key.remoteJid!).user, 'c.us') - await sendReceipt(jid, undefined, [msg.key.id], 'hist_sync') - } - - return newEvents - } - const handleEncryptNotification = async(node: BinaryNode) => { const from = node.attrs.from if(from === S_WHATSAPP_NET) { @@ -223,7 +167,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - const processNotification = (node: BinaryNode) => { + const processNotification = async(node: BinaryNode) => { const result: Partial = { } const [child] = getAllBinaryNodeChildren(node) const nodeType = node.attrs.type @@ -294,13 +238,19 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const event = decodeMediaRetryNode(node) ev.emit('messages.media-update', [event]) } else if(nodeType === 'encrypt') { - handleEncryptNotification(node) + await handleEncryptNotification(node) } else if(nodeType === 'devices') { const devices = getBinaryNodeChildren(child, 'device') if(areJidsSameUser(child.attrs.jid, authState.creds!.me!.id)) { const deviceJids = devices.map(d => d.attrs.jid) logger.info({ deviceJids }, 'got my own devices') } + } else if(nodeType === 'server_sync') { + const update = getBinaryNodeChild(node, 'collection') + if(update) { + const name = update.attrs.name as WAPatchName + await mutationMutex.mutex(() => resyncAppState([name], undefined)) + } } if(Object.keys(result).length) { @@ -372,191 +322,156 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { participant: attrs.participant } - await processingMutex.mutex( - remoteJid, - async() => { - const status = getStatusFromReceiptType(attrs.type) - if( - typeof status !== 'undefined' && - ( - // basically, we only want to know when a message from us has been delivered to/read by the other person - // or another device of ours has read some messages - status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK || - !isNodeFromMe - ) - ) { - if(isJidGroup(remoteJid)) { - if(attrs.participant) { - const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' + await Promise.all([ + processingMutex.mutex( + async() => { + const status = getStatusFromReceiptType(attrs.type) + if( + typeof status !== 'undefined' && + ( + // basically, we only want to know when a message from us has been delivered to/read by the other person + // or another device of ours has read some messages + status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK || + !isNodeFromMe + ) + ) { + if(isJidGroup(remoteJid)) { + if(attrs.participant) { + const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp' + ev.emit( + 'message-receipt.update', + ids.map(id => ({ + key: { ...key, id }, + receipt: { + userJid: jidNormalizedUser(attrs.participant), + [updateKey]: +attrs.t + } + })) + ) + } + } else { ev.emit( - 'message-receipt.update', + 'messages.update', ids.map(id => ({ key: { ...key, id }, - receipt: { - userJid: jidNormalizedUser(attrs.participant), - [updateKey]: +attrs.t - } + update: { status } })) ) } - } else { - ev.emit( - 'messages.update', - ids.map(id => ({ - key: { ...key, id }, - update: { status } - })) - ) } - } - await sendMessageAck(node) - - if(attrs.type === 'retry') { - // correctly set who is asking for the retry - key.participant = key.participant || attrs.from - if(willSendMessageAgain(ids[0], key.participant)) { - if(key.fromMe) { - try { - logger.debug({ attrs, key }, 'recv retry request') - await sendMessagesAgain(key, ids) - } catch(error) { - logger.error({ key, ids, trace: error.stack }, 'error in sending message again') + if(attrs.type === 'retry') { + // correctly set who is asking for the retry + key.participant = key.participant || attrs.from + if(willSendMessageAgain(ids[0], key.participant)) { + if(key.fromMe) { + try { + logger.debug({ attrs, key }, 'recv retry request') + await sendMessagesAgain(key, ids) + } catch(error) { + logger.error({ key, ids, trace: error.stack }, 'error in sending message again') + } + } else { + logger.info({ attrs, key }, 'recv retry for not fromMe message') } } else { - logger.info({ attrs, key }, 'recv retry for not fromMe message') + logger.info({ attrs, key }, 'will not send message again, as sent too many times') } - } else { - logger.info({ attrs, key }, 'will not send message again, as sent too many times') } } - } - ) + ), + sendMessageAck(node) + ]) } const handleNotification = async(node: BinaryNode) => { const remoteJid = node.attrs.from - await sendMessageAck(node) - const msg = processNotification(node) - if(msg) { - const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) - msg.key = { - remoteJid, - fromMe, - participant: node.attrs.participant, - id: node.attrs.id, - ...(msg.key || {}) - } - msg.participant = node.attrs.participant - msg.messageTimestamp = +node.attrs.t + await Promise.all([ + processingMutex.mutex( + async() => { + const msg = await processNotification(node) + if(msg) { + const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) + msg.key = { + remoteJid, + fromMe, + participant: node.attrs.participant, + id: node.attrs.id, + ...(msg.key || {}) + } + msg.participant = node.attrs.participant + msg.messageTimestamp = +node.attrs.t - const fullMsg = proto.WebMessageInfo.fromObject(msg) - ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' }) - } - } - - const handleUpsertedMessages = async({ messages, type }: BaileysEventMap['messages.upsert']) => { - if(type === 'notify' || type === 'append') { - const contactNameUpdates: { [_: string]: string } = { } - for(const msg of messages) { - const normalizedChatId = jidNormalizedUser(msg.key.remoteJid) - if(!!msg.pushName) { - let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid) - jid = jidNormalizedUser(jid) - - contactNameUpdates[jid] = msg.pushName - // update our pushname too - if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) { - ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } }) + const fullMsg = proto.WebMessageInfo.fromObject(msg) + await upsertMessage(fullMsg, 'append') } } - - const events = await processingMutex.mutex( - 'p-' + normalizedChatId, - () => processMessageLocal(msg) - ) - emitEventsFromMap(events) - } - - if(Object.keys(contactNameUpdates).length) { - ev.emit('contacts.update', Object.keys(contactNameUpdates).map( - id => ({ id, notify: contactNameUpdates[id] }) - )) - } - } + ), + sendMessageAck(node) + ]) } - const handleBadAck = async({ attrs }: BinaryNode) => { - // current hypothesis is that if pash is sent in the ack - // it means -- the message hasn't reached all devices yet - // we'll retry sending the message here - if(attrs.phash) { - logger.info({ attrs }, 'received phash in ack, resending message...') - const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id } - const msg = await getMessage(key) - if(msg) { - await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false }) - } else { - logger.warn({ attrs }, 'could not send message again, as it was not found') - } - } - } - - // recv a message - ws.on('CB:message', (stanza: BinaryNode) => { - const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(stanza, authState) - processingMutex.mutex( - msg.key.remoteJid!, - async() => { - await sendMessageAck(stanza) - await decryptionTask - // message failed to decrypt - if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { - logger.error( - { key: msg.key, params: msg.messageStubParameters }, - 'failure in decrypting message' - ) - retryMutex.mutex( - async() => { - if(ws.readyState === ws.OPEN) { - await sendRetryRequest(stanza) - if(retryRequestDelayMs) { - await delay(retryRequestDelayMs) + const handleMessage = async(node: BinaryNode) => { + const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(node, authState) + await Promise.all([ + processingMutex.mutex( + async() => { + await decryptionTask + // message failed to decrypt + if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) { + logger.error( + { key: msg.key, params: msg.messageStubParameters }, + 'failure in decrypting message' + ) + retryMutex.mutex( + async() => { + if(ws.readyState === ws.OPEN) { + await sendRetryRequest(node) + if(retryRequestDelayMs) { + await delay(retryRequestDelayMs) + } + } else { + logger.debug({ node }, 'connection closed, ignoring retry req') } - } else { - logger.debug({ stanza }, 'connection closed, ignoring retry req') } + ) + } else { + // no type in the receipt => message delivered + let type: MessageReceiptType = undefined + let participant = msg.key.participant + if(category === 'peer') { // special peer message + type = 'peer_msg' + } else if(msg.key.fromMe) { // message was sent by us from a different device + type = 'sender' + // need to specially handle this case + if(isJidUser(msg.key.remoteJid)) { + participant = author + } + } else if(!sendActiveReceipts) { + type = 'inactive' } - ) - } else { - // no type in the receipt => message delivered - let type: MessageReceiptType = undefined - let participant = msg.key.participant - if(category === 'peer') { // special peer message - type = 'peer_msg' - } else if(msg.key.fromMe) { // message was sent by us from a different device - type = 'sender' - // need to specially handle this case - if(isJidUser(msg.key.remoteJid)) { - participant = author + + await sendReceipt(msg.key.remoteJid!, participant, [msg.key.id!], type) + + + // send ack for history message + const isAnyHistoryMsg = isHistoryMsg(msg.message) + if(isAnyHistoryMsg) { + const jid = jidEncode(jidDecode(msg.key.remoteJid!).user, 'c.us') + await sendReceipt(jid, undefined, [msg.key.id], 'hist_sync') } - } else if(!sendActiveReceipts) { - type = 'inactive' } - await sendReceipt(msg.key.remoteJid!, participant, [msg.key.id!], type) + cleanMessage(msg, authState.creds.me!.id) } + ), + sendMessageAck(node) + ]) - cleanMessage(msg, authState.creds.me!.id) - ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' }) - } - ) - .catch( - error => onUnexpectedError(error, 'processing message') - ) - }) + await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify') + } - ws.on('CB:call', async(node: BinaryNode) => { + const handleCall = async(node: BinaryNode) => { const { attrs } = node const [infoChild] = getAllBinaryNodeChildren(node) const callId = infoChild.attrs['call-id'] @@ -591,25 +506,62 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ev.emit('call', [call]) await sendMessageAck(node, { type: infoChild.tag }) - .catch( - error => onUnexpectedError(error, 'ack call') - ) + } + + const handleBadAck = async({ attrs }: BinaryNode) => { + // current hypothesis is that if pash is sent in the ack + // it means -- the message hasn't reached all devices yet + // we'll retry sending the message here + if(attrs.phash) { + logger.info({ attrs }, 'received phash in ack, resending message...') + const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id } + const msg = await getMessage(key) + if(msg) { + await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false }) + } else { + logger.warn({ attrs }, 'could not send message again, as it was not found') + } + } + } + + const flushBufferIfLastOfflineNode = ( + node: BinaryNode, + identifier: string, + exec: (node: BinaryNode) => Promise + ) => { + const task = exec(node) + .catch(err => onUnexpectedError(err, identifier)) + const offline = node.attrs.offline + if(offline) { + ev.processInBuffer(task) + } + } + + // called when all offline notifs are handled + ws.on('CB:ib,,offline', (node: BinaryNode) => { + const child = getBinaryNodeChild(node, 'offline') + const offlineNotifs = +child.attrs.count + + logger.info(`handled ${offlineNotifs} offline messages/notifications`) + ev.emit('connection.update', { receivedPendingNotifications: true }) + ev.flush() + }) + + // recv a message + ws.on('CB:message', (node: BinaryNode) => { + flushBufferIfLastOfflineNode(node, 'processing message', handleMessage) + }) + + ws.on('CB:call', async(node: BinaryNode) => { + flushBufferIfLastOfflineNode(node, 'handling call', handleCall) }) ws.on('CB:receipt', node => { - handleReceipt(node) - .catch( - error => onUnexpectedError(error, 'handling receipt') - ) + flushBufferIfLastOfflineNode(node, 'handling receipt', handleReceipt) }) ws.on('CB:notification', async(node: BinaryNode) => { - handleNotification(node) - .catch( - error => { - onUnexpectedError(error, 'handling notification') - } - ) + flushBufferIfLastOfflineNode(node, 'handling notification', handleNotification) }) ws.on('CB:ack,class:message', (node: BinaryNode) => { @@ -617,13 +569,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { .catch(error => onUnexpectedError(error, 'handling bad ack')) }) - ev.on('messages.upsert', data => { - handleUpsertedMessages(data) - .catch( - error => onUnexpectedError(error, 'handling upserted messages') - ) - }) - ev.on('call', ([ call ]) => { // missed call + group call notification message generation if(call.status === 'timeout' || (call.status === 'offer' && call.isGroup)) { @@ -662,7 +607,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { return { ...sock, - processMessage: processMessageLocal, sendMessageAck, sendRetryRequest } diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index 502d9a1..ad08fdd 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -6,7 +6,7 @@ import { WA_DEFAULT_EPHEMERAL } from '../Defaults' import { AnyMessageContent, MediaConnInfo, MessageReceiptType, MessageRelayOptions, MiscMessageGenerationOptions, SocketConfig, WAMessageKey } from '../Types' import { aggregateMessageKeysNotFromMe, assertMediaContent, bindWaitForEvent, decryptMediaRetryData, encodeWAMessage, encryptMediaRetryRequest, encryptSenderKeyMsgSignalProto, encryptSignalProto, extractDeviceJids, generateMessageID, generateWAMessage, getUrlFromDirectPath, getWAUploadToServer, jidToSignalProtocolAddress, parseAndInjectE2ESessions, unixTimestampSeconds } from '../Utils' import { getUrlInfo } from '../Utils/link-preview' -import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, JidWithDevice, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' +import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, JidWithDevice, S_WHATSAPP_NET } from '../WABinary' import { makeGroupsSocket } from './groups' export const makeMessagesSocket = (config: SocketConfig) => { @@ -15,7 +15,9 @@ export const makeMessagesSocket = (config: SocketConfig) => { const { ev, authState, + upsertMessage, query, + fetchPrivacySettings, generateMessageTag, sendNode, groupMetadata, @@ -26,26 +28,6 @@ export const makeMessagesSocket = (config: SocketConfig) => { stdTTL: 300, // 5 minutes useClones: false }) - let privacySettings: { [_: string]: string } | undefined - - const fetchPrivacySettings = async(force: boolean = false) => { - if(!privacySettings || force) { - const { content } = await query({ - tag: 'iq', - attrs: { - xmlns: 'privacy', - to: S_WHATSAPP_NET, - type: 'get' - }, - content: [ - { tag: 'privacy', attrs: { } } - ] - }) - privacySettings = reduceBinaryNodeToDictionary(content[0] as BinaryNode, 'category') - } - - return privacySettings - } let mediaConn: Promise const refreshMediaConn = async(forceGet = false) => { @@ -633,7 +615,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { await relayMessage(jid, fullMsg.message, { messageId: fullMsg.key.id!, cachedGroupMetadata: options.cachedGroupMetadata, additionalAttributes }) if(config.emitOwnEvents) { process.nextTick(() => { - ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' }) + upsertMessage(fullMsg, 'append') }) } diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index c75ab9a..86f203d 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -6,6 +6,7 @@ import { proto } from '../../WAProto' import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, INITIAL_PREKEY_COUNT, MIN_PREKEY_COUNT } from '../Defaults' import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types' import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, generateLoginNode, generateMdTagPrefix, generateRegistrationNode, getCodeFromWSError, getErrorCodeFromStreamError, getNextPreKeysNode, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout } from '../Utils' +import { makeEventBuffer } from '../Utils/event-buffer' import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, getBinaryNodeChildren, S_WHATSAPP_NET } from '../WABinary' /** @@ -34,7 +35,8 @@ export const makeSocket = ({ agent }) ws.setMaxListeners(0) - const ev = new EventEmitter() as BaileysEventEmitter + const _ev = new EventEmitter() as BaileysEventEmitter + const ev = makeEventBuffer(_ev, logger) /** ephemeral key pair used to encrypt/decrypt communication. Unique for each connection */ const ephemeralKeyPair = Curve.generateKeyPair() /** WA noise protocol wrapper */ @@ -501,15 +503,6 @@ export const makeSocket = ({ ev.emit('connection.update', { connection: 'open' }) }) - ws.on('CB:ib,,offline', (node: BinaryNode) => { - const child = getBinaryNodeChild(node, 'offline') - const offlineCount = +child.attrs.count - - logger.info(`got ${offlineCount} offline messages/notifications`) - - ev.emit('connection.update', { receivedPendingNotifications: true }) - }) - ws.on('CB:stream:error', (node: BinaryNode) => { logger.error({ node }, 'stream errored out') @@ -528,6 +521,8 @@ export const makeSocket = ({ }) process.nextTick(() => { + // start buffering important events + ev.buffer() ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) }) // update credentials when required @@ -572,7 +567,7 @@ export const makeSocket = ({ onUnexpectedError, uploadPreKeys, /** Waits for the connection to WA to reach a state */ - waitForConnectionUpdate: bindWaitForConnectionUpdate(ev) + waitForConnectionUpdate: bindWaitForConnectionUpdate(ev), } } diff --git a/src/Tests/test.app-state-sync.ts b/src/Tests/test.app-state-sync.ts index 55b5898..14f925e 100644 --- a/src/Tests/test.app-state-sync.ts +++ b/src/Tests/test.app-state-sync.ts @@ -1,8 +1,7 @@ import { AccountSettings, ChatMutation, Contact, InitialAppStateSyncOptions } from '../Types' import { unixTimestampSeconds } from '../Utils' -import { processSyncActions } from '../Utils/chat-utils' +import { processSyncAction } from '../Utils/chat-utils' import logger from '../Utils/logger' -import { jidEncode } from '../WABinary' describe('App State Sync Tests', () => { @@ -59,7 +58,7 @@ describe('App State Sync Tests', () => { ] for(const mutations of CASES) { - const events = processSyncActions(mutations, me, undefined, logger) + const events = processSyncAction(mutations, me, undefined, logger) expect(events['chats.update']).toHaveLength(1) const event = events['chats.update']?.[0] expect(event.archive).toEqual(false) @@ -205,8 +204,4 @@ describe('App State Sync Tests', () => { expect(event.archive).toEqual(true) } }) -}) - -function randomJid() { - return jidEncode(Math.floor(Math.random() * 1000000), Math.random() < 0.5 ? 's.whatsapp.net' : 'g.us') -} \ No newline at end of file +}) \ No newline at end of file diff --git a/src/Tests/test.event-buffer.ts b/src/Tests/test.event-buffer.ts new file mode 100644 index 0000000..41acef9 --- /dev/null +++ b/src/Tests/test.event-buffer.ts @@ -0,0 +1,155 @@ +import EventEmitter from 'events' +import { proto } from '../../WAProto' +import { BaileysEventEmitter, Chat, WAMessageKey, WAMessageStatus, WAMessageStubType, WAMessageUpdate } from '../Types' +import { delay, generateMessageID, makeEventBuffer, toNumber, unixTimestampSeconds } from '../Utils' +import logger from '../Utils/logger' +import { randomJid } from './utils' + +describe('Event Buffer Tests', () => { + + const emitter = new EventEmitter() as BaileysEventEmitter + const ev = makeEventBuffer(emitter, logger) + + it('should buffer a chat upsert & update event', async() => { + const chatId = randomJid() + + const chats: Chat[] = [] + + emitter.on('chats.upsert', c => chats.push(...c)) + emitter.on('chats.update', () => fail('should not emit update event')) + + ev.buffer() + ev.processInBuffer((async() => { + await delay(100) + ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }]) + })()) + ev.processInBuffer((async() => { + await delay(200) + ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }]) + })()) + + await ev.flush() + + expect(chats).toHaveLength(1) + expect(chats[0].conversationTimestamp).toEqual(124) + expect(chats[0].unreadCount).toEqual(2) + }) + + it('should buffer message upsert events', async() => { + const messageTimestamp = unixTimestampSeconds() + const msg: proto.IWebMessageInfo = { + key: { + remoteJid: randomJid(), + id: generateMessageID(), + fromMe: false + }, + messageStubType: WAMessageStubType.CIPHERTEXT, + messageTimestamp + } + + const msgs: proto.IWebMessageInfo[] = [] + + emitter.on('messages.upsert', c => { + msgs.push(...c.messages) + expect(c.type).toEqual('notify') + }) + + ev.buffer() + ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) + + msg.messageTimestamp = unixTimestampSeconds() + 1 + msg.messageStubType = undefined + msg.message = { conversation: 'Test' } + ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) + ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) + + await ev.flush() + + expect(msgs).toHaveLength(1) + expect(msgs[0].message).toBeTruthy() + expect(toNumber(msgs[0].messageTimestamp)).toEqual(messageTimestamp) + expect(msgs[0].status).toEqual(WAMessageStatus.READ) + }) + + it('should buffer a message receipt update', async() => { + const msg: proto.IWebMessageInfo = { + key: { + remoteJid: randomJid(), + id: generateMessageID(), + fromMe: false + }, + messageStubType: WAMessageStubType.CIPHERTEXT, + messageTimestamp: unixTimestampSeconds() + } + + const msgs: proto.IWebMessageInfo[] = [] + + emitter.on('messages.upsert', c => msgs.push(...c.messages)) + emitter.on('message-receipt.update', () => fail('should not emit')) + + ev.buffer() + ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) + ev.emit('message-receipt.update', [ + { + key: msg.key, + receipt: { + userJid: randomJid(), + readTimestamp: unixTimestampSeconds() + } + } + ]) + + await ev.flush() + + expect(msgs).toHaveLength(1) + expect(msgs[0].userReceipt).toHaveLength(1) + }) + + it('should buffer multiple status updates', async() => { + const key: WAMessageKey = { + remoteJid: randomJid(), + id: generateMessageID(), + fromMe: false + } + + const msgs: WAMessageUpdate[] = [] + + emitter.on('messages.update', c => msgs.push(...c)) + + ev.buffer() + ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }]) + ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }]) + + await ev.flush() + + expect(msgs).toHaveLength(1) + expect(msgs[0].update.status).toEqual(WAMessageStatus.READ) + }) + + it('should remove chat unread counter', async() => { + const msg: proto.IWebMessageInfo = { + key: { + remoteJid: '12345@s.whatsapp.net', + id: generateMessageID(), + fromMe: false + }, + message: { + conversation: 'abcd' + }, + messageTimestamp: unixTimestampSeconds() + } + + const chats: Partial[] = [] + + emitter.on('chats.update', c => chats.push(...c)) + + ev.buffer() + ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' }) + ev.emit('chats.update', [{ id: msg.key.remoteJid, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }]) + ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }]) + + await ev.flush() + + expect(chats[0].unreadCount).toBeUndefined() + }) +}) \ No newline at end of file diff --git a/src/Tests/utils.ts b/src/Tests/utils.ts new file mode 100644 index 0000000..9d5ce04 --- /dev/null +++ b/src/Tests/utils.ts @@ -0,0 +1,6 @@ +import { jidEncode } from '../WABinary' + + +export function randomJid() { + return jidEncode(Math.floor(Math.random() * 1000000), Math.random() < 0.5 ? 's.whatsapp.net' : 'g.us') +} \ No newline at end of file diff --git a/src/Types/Events.ts b/src/Types/Events.ts index fb393b1..5d064c0 100644 --- a/src/Types/Events.ts +++ b/src/Types/Events.ts @@ -1,12 +1,11 @@ import type { Boom } from '@hapi/boom' -import type EventEmitter from 'events' import { proto } from '../../WAProto' import { AuthenticationCreds } from './Auth' import { WACallEvent } from './Call' import { Chat, PresenceData } from './Chat' import { Contact } from './Contact' import { GroupMetadata, ParticipantAction } from './GroupMetadata' -import { MessageUpdateType, MessageUserReceiptUpdate, WAMessage, WAMessageKey, WAMessageUpdate } from './Message' +import { MessageUpsertType, MessageUserReceiptUpdate, WAMessage, WAMessageKey, WAMessageUpdate } from './Message' import { ConnectionState } from './State' export type BaileysEventMap = { @@ -39,8 +38,9 @@ export type BaileysEventMap = { * add/update the given messages. If they were received while the connection was online, * the update will have type: "notify" * */ - 'messages.upsert': { messages: WAMessage[], type: MessageUpdateType } - 'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction, operation: 'add' | 'remove' } + 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType } + /** message was reacted to. If reaction was removed -- then "reaction.text" will be falsey */ + 'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction }[] 'message-receipt.update': MessageUserReceiptUpdate[] @@ -55,10 +55,26 @@ export type BaileysEventMap = { 'call': WACallEvent[] } -export interface CommonBaileysEventEmitter extends EventEmitter { - on>(event: T, listener: (arg: BaileysEventMap[T]) => void): this - off>(event: T, listener: (arg: BaileysEventMap[T]) => void): this - removeAllListeners>(event: T): this +export type BufferedEventData = { + chatUpserts: { [jid: string]: Chat } + chatUpdates: { [jid: string]: Partial } + chatDeletes: Set + contactUpserts: { [jid: string]: Contact } + contactUpdates: { [jid: string]: Partial } + messageUpserts: { [key: string]: { type: MessageUpsertType, message: WAMessage } } + messageUpdates: { [key: string]: WAMessageUpdate } + messageDeletes: { [key: string]: WAMessageKey } + messageReactions: { [key: string]: { key: WAMessageKey, reactions: proto.IReaction[] } } + messageReceipts: { [key: string]: { key: WAMessageKey, userReceipt: proto.IUserReceipt[] } }, + groupUpdates: { [jid: string]: Partial } +} + +export type BaileysEvent = keyof BaileysEventMap + +export interface CommonBaileysEventEmitter { + on>(event: T, listener: (arg: BaileysEventMap[T]) => void): void + off>(event: T, listener: (arg: BaileysEventMap[T]) => void): void + removeAllListeners>(event: T): void emit>(event: T, arg: BaileysEventMap[T]): boolean } diff --git a/src/Types/Message.ts b/src/Types/Message.ts index dfb70a1..0577027 100644 --- a/src/Types/Message.ts +++ b/src/Types/Message.ts @@ -186,7 +186,12 @@ export type MessageContentGenerationOptions = MediaGenerationOptions & { } export type MessageGenerationOptions = MessageContentGenerationOptions & MessageGenerationOptionsFromContent -export type MessageUpdateType = 'append' | 'notify' | 'replace' +/** + * Type of message upsert + * 1. notify => notify the user, this message was just received + * 2. append => append the message to the chat history, no notification required + */ +export type MessageUpsertType = 'append' | 'notify' export type MessageUserReceipt = proto.IUserReceipt diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts new file mode 100644 index 0000000..afc1c89 --- /dev/null +++ b/src/Utils/event-buffer.ts @@ -0,0 +1,362 @@ +import { Logger } from 'pino' +import { proto } from '../../WAProto' +import { AuthenticationCreds, BaileysEvent, BaileysEventEmitter, BaileysEventMap, BufferedEventData, Chat, Contact, WAMessage, WAMessageStatus } from '../Types' +import { updateMessageWithReaction, updateMessageWithReceipt } from './messages' +import { isRealMessage, shouldIncrementChatUnread } from './process-message' + +const BUFFERABLE_EVENT = [ + 'chats.upsert', + 'chats.update', + 'chats.delete', + 'contacts.upsert', + 'contacts.update', + 'messages.upsert', + 'messages.update', + 'messages.delete', + 'messages.reaction', + 'message-receipt.update', + 'groups.update', +] as const + +type BufferableEvent = typeof BUFFERABLE_EVENT[number] + +const BUFFERABLE_EVENT_SET = new Set(BUFFERABLE_EVENT) + +type BaileysBufferableEventEmitter = BaileysEventEmitter & { + /** starts buffering events, call flush() to release them */ + buffer(): void + /** flushes all buffered events */ + flush(): Promise + /** waits for the task to complete, before releasing the buffer */ + processInBuffer(task: Promise) +} + +/** + * The event buffer logically consolidates different events into a single event + * making the data processing more efficient. + * @param ev the baileys event emitter + */ +export const makeEventBuffer = ( + ev: BaileysEventEmitter, + logger: Logger +): BaileysBufferableEventEmitter => { + + let data = makeBufferData() + let isBuffering = false + + let preBufferTask: Promise = Promise.resolve() + + return { + emit(event: BaileysEvent, evData: BaileysEventMap[T]) { + if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { + append(data, event as any, evData, logger) + return true + } + + return ev.emit(event, evData) + }, + processInBuffer(task) { + if(isBuffering) { + preBufferTask = Promise.allSettled([ preBufferTask, task ]) + } + }, + buffer() { + logger.trace('buffering events') + isBuffering = true + }, + async flush() { + if(!isBuffering) { + return + } + + logger.trace('releasing buffered events...') + await preBufferTask + + isBuffering = false + flush(data, ev) + data = makeBufferData() + + logger.trace('released buffered events') + }, + on: (...args) => ev.on(...args), + off: (...args) => ev.off(...args), + removeAllListeners: (...args) => ev.removeAllListeners(...args), + } +} + +const makeBufferData = (): BufferedEventData => { + return { + chatUpserts: { }, + chatUpdates: { }, + chatDeletes: new Set(), + contactUpserts: { }, + contactUpdates: { }, + messageUpserts: { }, + messageUpdates: { }, + messageReactions: { }, + messageDeletes: { }, + messageReceipts: { }, + groupUpdates: { } + } +} + +function append( + data: BufferedEventData, + event: E, + eventData: any, + logger: Logger +) { + switch (event) { + case 'chats.upsert': + for(const chat of eventData as Chat[]) { + let upsert = data.chatUpserts[chat.id] || { } as Chat + upsert = concatChats(upsert, chat) + if(data.chatUpdates[chat.id]) { + logger.debug({ chatId: chat.id }, 'absorbed chat update in chat upsert') + upsert = concatChats(data.chatUpdates[chat.id] as Chat, upsert) + delete data.chatUpdates[chat.id] + } + + data.chatUpserts[chat.id] = upsert + } + + break + case 'chats.update': + for(const update of eventData as Partial[]) { + const upsert = data.chatUpserts[update.id!] + if(upsert) { + concatChats(upsert, update) + } else { + const chatUpdate = data.chatUpdates[update.id] || { } + data.chatUpdates[update.id] = concatChats(chatUpdate, update) + } + } + + break + case 'chats.delete': + for(const chatId of eventData as string[]) { + data.chatDeletes.add(chatId) + if(data.chatUpdates[chatId]) { + delete data.chatUpdates[chatId] + } + + if(data.chatUpserts[chatId]) { + delete data.chatUpserts[chatId] + } + } + + break + case 'contacts.upsert': + for(const contact of eventData as Contact[]) { + let upsert = data.contactUpserts[contact.id] || { } as Contact + upsert = Object.assign(upsert, contact) + if(data.contactUpdates[contact.id]) { + upsert = Object.assign(data.contactUpdates[contact.id], upsert) + delete data.contactUpdates[contact.id] + } + + data.contactUpserts[contact.id] = upsert + } + + break + case 'contacts.update': + const contactUpdates = eventData as BaileysEventMap['contacts.update'] + for(const update of contactUpdates) { + const upsert = data.contactUpserts[update.id!] + if(upsert) { + Object.assign(upsert, update) + } else { + const contactUpdate = data.contactUpdates[update.id] || { } + data.contactUpdates[update.id] = Object.assign(contactUpdate, update) + } + } + + break + case 'messages.upsert': + const { messages, type } = eventData as BaileysEventMap['messages.upsert'] + for(const message of messages) { + const key = stringifyMessageKey(message.key) + const existing = data.messageUpserts[key] + if(existing) { + message.messageTimestamp = existing.message.messageTimestamp + } + + data.messageUpserts[key] = { + message, + type: type === 'notify' || existing?.type === 'notify' + ? 'notify' + : type + } + } + + break + case 'messages.update': + const msgUpdates = eventData as BaileysEventMap['messages.update'] + for(const { key, update } of msgUpdates) { + const keyStr = stringifyMessageKey(key) + const existing = data.messageUpserts[keyStr] + if(existing) { + Object.assign(existing.message, update) + // if the message was received & read by us + // the chat counter must have been incremented + // so we need to decrement it + if(update.status === WAMessageStatus.READ && !key.fromMe) { + decrementChatReadCounterIfMsgDidUnread(existing.message) + } + } else { + const msgUpdate = data.messageUpdates[keyStr] || { key, update: { } } + Object.assign(msgUpdate.update, update) + data.messageUpdates[keyStr] = msgUpdate + } + } + + break + case 'messages.delete': + const deleteData = eventData as BaileysEventMap['messages.delete'] + if('keys' in deleteData) { + const { keys } = deleteData + for(const key of keys) { + const keyStr = stringifyMessageKey(key) + data.messageDeletes[keyStr] = key + + if(data.messageUpserts[keyStr]) { + delete data.messageUpserts[keyStr] + } + + if(data.messageUpdates[keyStr]) { + delete data.messageUpdates[keyStr] + } + } + } else { + // TODO: add support + } + + break + case 'messages.reaction': + const reactions = eventData as BaileysEventMap['messages.reaction'] + for(const { key, reaction } of reactions) { + const keyStr = stringifyMessageKey(key) + const existing = data.messageUpserts[keyStr] + if(existing) { + updateMessageWithReaction(existing.message, reaction) + } else { + data.messageReactions[keyStr] = data.messageReactions[keyStr] + || { key, reactions: [] } + updateMessageWithReaction(data.messageReactions[keyStr], reaction) + } + } + + break + case 'message-receipt.update': + const receipts = eventData as BaileysEventMap['message-receipt.update'] + for(const { key, receipt } of receipts) { + const keyStr = stringifyMessageKey(key) + const existing = data.messageUpserts[keyStr] + if(existing) { + updateMessageWithReceipt(existing.message, receipt) + } else { + data.messageReceipts[keyStr] = data.messageReceipts[keyStr] + || { key, userReceipt: [] } + updateMessageWithReceipt(data.messageReceipts[keyStr], receipt) + } + } + + break + case 'groups.update': + const groupUpdates = eventData as BaileysEventMap['groups.update'] + for(const update of groupUpdates) { + const groupUpdate = data.groupUpdates[update.id] || { } + data.groupUpdates[update.id] = Object.assign(groupUpdate, update) + } + + break + default: + throw new Error(`"${event}" cannot be buffered`) + } + + function decrementChatReadCounterIfMsgDidUnread(message: WAMessage) { + // decrement chat unread counter + // if the message has already been marked read by us + const chatId = message.key.remoteJid + const chat = data.chatUpdates[chatId] || data.chatUpserts[chatId] + if( + isRealMessage(message, false) + && shouldIncrementChatUnread(message) + && typeof chat.unreadCount !== 'undefined' + && chat.unreadCount > 0 + ) { + logger.debug({ chatId: chat.id }, 'decrementing chat counter') + chat.unreadCount -= 1 + if(chat.unreadCount === 0) { + delete chat.unreadCount + } + } + } +} + +function flush(data: BufferedEventData, ev: BaileysEventEmitter) { + const chatUpsertList = Object.values(data.chatUpserts) + chatUpsertList.length && ev.emit('chats.upsert', chatUpsertList) + + const chatUpdateList = Object.values(data.chatUpdates) + chatUpdateList.length && ev.emit('chats.update', chatUpdateList) + + const chatDeleteList = Array.from(data.chatDeletes) + chatDeleteList.length && ev.emit('chats.delete', chatDeleteList) + + const messageUpsertList = Object.values(data.messageUpserts) + if(messageUpsertList.length) { + const appends: WAMessage[] = [] + const notifys: WAMessage[] = [] + for(const { message, type } of messageUpsertList) { + const arr = type === 'append' ? appends : notifys + arr.push(message) + } + + if(appends.length) { + ev.emit('messages.upsert', { type: 'append', messages: appends }) + } + + if(notifys.length) { + ev.emit('messages.upsert', { type: 'notify', messages: notifys }) + } + } + + const messageUpdateList = Object.values(data.messageUpdates) + messageUpdateList.length && ev.emit('messages.update', messageUpdateList) + + const messageDeleteList = Object.values(data.messageDeletes) + messageDeleteList.length && ev.emit('messages.delete', { keys: messageDeleteList }) + + const messageReactionList = Object.values(data.messageReactions).flatMap( + ({ key, reactions }) => reactions.flatMap(reaction => ({ key, reaction })) + ) + messageReactionList.length && ev.emit('messages.reaction', messageReactionList) + + const messageReceiptList = Object.values(data.messageReceipts).flatMap( + ({ key, userReceipt }) => userReceipt.flatMap(receipt => ({ key, receipt })) + ) + messageReceiptList.length && ev.emit('message-receipt.update', messageReceiptList) + + const contactUpsertList = Object.values(data.contactUpserts) + contactUpsertList.length && ev.emit('contacts.upsert', contactUpsertList) + + const contactUpdateList = Object.values(data.contactUpdates) + contactUpdateList.length && ev.emit('contacts.update', contactUpdateList) + + const groupUpdateList = Object.values(data.groupUpdates) + groupUpdateList.length && ev.emit('groups.update', groupUpdateList) +} + +function concatChats>(a: C, b: C) { + if(typeof a.unreadCount !== 'undefined' && typeof b.unreadCount !== 'undefined') { + b = { ...b } + if(b.unreadCount >= 0) { + b.unreadCount = Math.max(b.unreadCount, 0) + Math.max(a.unreadCount, 0) + } + } + + return Object.assign(a, b) +} + +const stringifyMessageKey = (key: proto.IMessageKey) => `${key.remoteJid},${key.id},${key.fromMe ? '1' : '0'}` \ No newline at end of file diff --git a/src/Utils/history.ts b/src/Utils/history.ts index 142260b..ac7d495 100644 --- a/src/Utils/history.ts +++ b/src/Utils/history.ts @@ -4,6 +4,7 @@ import { proto } from '../../WAProto' import { Chat, Contact, InitialReceivedChatsState } from '../Types' import { isJidUser } from '../WABinary' import { toNumber } from './generics' +import { normalizeMessageContent } from './messages' import { downloadContentFromMessage } from './messages-media' const inflatePromise = promisify(inflate) @@ -102,4 +103,11 @@ export const downloadAndProcessHistorySyncNotification = async( ) => { const historyMsg = await downloadHistory(msg) return processHistoryMessage(historyMsg, historyCache, recvChats) +} + +export const isHistoryMsg = (message: proto.IMessage) => { + const normalizedContent = !!message ? normalizeMessageContent(message) : undefined + const isAnyHistoryMsg = !!normalizedContent?.protocolMessage?.historySyncNotification + + return isAnyHistoryMsg } \ No newline at end of file diff --git a/src/Utils/index.ts b/src/Utils/index.ts index 6e6ad5e..e6383de 100644 --- a/src/Utils/index.ts +++ b/src/Utils/index.ts @@ -1,5 +1,5 @@ -export * from './decode-wa-message' export * from './generics' +export * from './decode-wa-message' export * from './messages' export * from './messages-media' export * from './validate-connection' @@ -14,4 +14,5 @@ export * from './legacy-msgs' export * from './baileys-event-stream' export * from './use-single-file-auth-state' export * from './use-multi-file-auth-state' -export * from './link-preview' \ No newline at end of file +export * from './link-preview' +export * from './event-buffer' \ No newline at end of file diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index db297d6..444bbba 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -17,6 +17,7 @@ import { WAMediaUpload, WAMessage, WAMessageContent, + WAMessageKey, WAMessageStatus, WAProto, WATextMessage, @@ -574,7 +575,7 @@ export const getDevice = (id: string) => { } /** Upserts a receipt in the message */ -export const updateMessageWithReceipt = (msg: WAMessage, receipt: MessageUserReceipt) => { +export const updateMessageWithReceipt = (msg: Pick, receipt: MessageUserReceipt) => { msg.userReceipt = msg.userReceipt || [] const recp = msg.userReceipt.find(m => m.userJid === receipt.userJid) if(recp) { @@ -584,6 +585,23 @@ export const updateMessageWithReceipt = (msg: WAMessage, receipt: MessageUserRec } } +const getKeyAuthor = (key: WAMessageKey | undefined | null) => ( + (key?.fromMe ? 'me' : key?.participant || key?.remoteJid) || '' +) + +/** Update the message with a new reaction */ +export const updateMessageWithReaction = (msg: Pick, reaction: proto.IReaction) => { + const authorID = getKeyAuthor(reaction.key) + + const reactions = (msg.reactions || []) + .filter(r => getKeyAuthor(r.key) !== authorID) + if(reaction.text) { + reactions.push(reaction) + } + + msg.reactions = reactions +} + /** Given a list of message keys, aggregates them by chat & sender. Useful for sending read receipts in bulk */ export const aggregateMessageKeysNotFromMe = (keys: proto.IMessageKey[]) => { const keyMap: { [id: string]: { jid: string, participant: string | undefined, messageIds: string[] } } = { } diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index c41579a..6f61248 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -38,6 +38,21 @@ export const cleanMessage = (message: proto.IWebMessageInfo, meId: string) => { } } +export const isRealMessage = (message: proto.IWebMessageInfo, treatCiphertextMessagesAsReal: boolean) => { + const normalizedContent = normalizeMessageContent(message.message) + return ( + !!normalizedContent + || MSG_MISSED_CALL_TYPES.has(message.messageStubType) + || (message.messageStubType === WAMessageStubType.CIPHERTEXT && treatCiphertextMessagesAsReal) + ) + && !normalizedContent?.protocolMessage + && !normalizedContent?.reactionMessage +} + +export const shouldIncrementChatUnread = (message: proto.IWebMessageInfo) => ( + !message.key.fromMe && !message.messageStubType +) + const processMessage = async( message: proto.IWebMessageInfo, { downloadHistory, historyCache, recvChats, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext @@ -48,19 +63,10 @@ const processMessage = async( const chat: Partial = { id: jidNormalizedUser(message.key.remoteJid) } - const normalizedContent = !!message.message && normalizeMessageContent(message.message) - if( - ( - !!normalizedContent - || MSG_MISSED_CALL_TYPES.has(message.messageStubType) - || (message.messageStubType === WAMessageStubType.CIPHERTEXT && treatCiphertextMessagesAsReal) - ) - && !normalizedContent?.protocolMessage - && !normalizedContent?.reactionMessage - ) { + if(isRealMessage(message, treatCiphertextMessagesAsReal)) { chat.conversationTimestamp = toNumber(message.messageTimestamp) // only increment unread count if not CIPHERTEXT and from another person - if(!message.key.fromMe && !message.messageStubType) { + if(shouldIncrementChatUnread(message)) { chat.unreadCount = (chat.unreadCount || 0) + 1 } @@ -153,12 +159,10 @@ const processMessage = async( ...content.reactionMessage, key: message.key, } - const operation = content.reactionMessage?.text ? 'add' : 'remove' - map['messages.reaction'] = { + map['messages.reaction'] = [{ reaction, key: content.reactionMessage!.key!, - operation - } + }] } else if(message.messageStubType) { const jid = message.key!.remoteJid! //let actor = whatsappID (message.participant)