From ab9b2328389b7a185c8d31c15aa393e53bc8ea94 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Tue, 29 Mar 2022 14:04:14 +0530 Subject: [PATCH] refactor: move processMessage out of socket --- src/Socket/messages-recv.ts | 213 +++++++---------------------------- src/Utils/generics.ts | 17 ++- src/Utils/process-message.ts | 167 +++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 176 deletions(-) create mode 100644 src/Utils/process-message.ts diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 4dcef63..1cbd697 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -1,9 +1,10 @@ import { proto } from '../../WAProto' import { KEY_BUNDLE_TYPE } from '../Defaults' -import { BaileysEventMap, Chat, GroupMetadata, MessageUserReceipt, ParticipantAction, SocketConfig, WAMessageStubType } from '../Types' -import { decodeMessageStanza, delay, downloadAndProcessHistorySyncNotification, encodeBigEndian, generateSignalPubKey, normalizeMessageContent, toNumber, xmppPreKey, xmppSignedPreKey } from '../Utils' +import { BaileysEventMap, MessageUserReceipt, SocketConfig, WAMessageStubType } from '../Types' +import { decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getStatusFromReceiptType, normalizeMessageContent, xmppPreKey, xmppSignedPreKey } from '../Utils' import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex' +import processMessage from '../Utils/process-message' import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getAllBinaryNodeChildren, getBinaryNodeChildren, isJidGroup, jidDecode, jidEncode, jidNormalizedUser } from '../WABinary' import { makeChatsSocket } from './chats' import { extractGroupMetadata } from './groups' @@ -130,142 +131,40 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { }) } - const processMessage = async(message: proto.IWebMessageInfo, chatUpdate: Partial) => { - const content = normalizeMessageContent(message.message) - const protocolMsg = content?.protocolMessage - if(protocolMsg) { - switch (protocolMsg.type) { - case proto.ProtocolMessage.ProtocolMessageType.HISTORY_SYNC_NOTIFICATION: - const histNotification = protocolMsg!.historySyncNotification - - logger.info({ histNotification, id: message.key.id }, 'got history notification') - - const meJid = authState.creds.me!.id - await sendNode({ - tag: 'receipt', - attrs: { - id: message.key.id, - type: 'hist_sync', - to: jidEncode(jidDecode(meJid).user, 'c.us') - } - }) - - const { chats, contacts, messages, isLatest } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache) - - if(chats.length) { - ev.emit('chats.set', { chats, isLatest }) - } - - if(messages.length) { - ev.emit('messages.set', { messages, isLatest }) - } - - if(contacts.length) { - ev.emit('contacts.set', { contacts }) - } - - if(isLatest) { - resyncMainAppState() - } - - break - case proto.ProtocolMessage.ProtocolMessageType.APP_STATE_SYNC_KEY_SHARE: - const keys = protocolMsg.appStateSyncKeyShare!.keys - if(keys?.length) { - let newAppStateSyncKeyId = '' - await authState.keys.transaction( - async() => { - for(const { keyData, keyId } of keys) { - const strKeyId = Buffer.from(keyId.keyId!).toString('base64') - - logger.info({ strKeyId }, 'injecting new app state sync key') - await authState.keys.set({ 'app-state-sync-key': { [strKeyId]: keyData } }) - - newAppStateSyncKeyId = strKeyId - } - } - ) - ev.emit('creds.update', { myAppStateKeyId: newAppStateSyncKeyId }) - } else { - logger.info({ protocolMsg }, 'recv app state sync with 0 keys') - } - - break - case proto.ProtocolMessage.ProtocolMessageType.REVOKE: - ev.emit('messages.update', [ - { - key: { - ...message.key, - id: protocolMsg.key!.id - }, - update: { message: null, messageStubType: WAMessageStubType.REVOKE, key: message.key } - } - ]) - break - case proto.ProtocolMessage.ProtocolMessageType.EPHEMERAL_SETTING: - chatUpdate.ephemeralSettingTimestamp = toNumber(message.messageTimestamp) - chatUpdate.ephemeralExpiration = protocolMsg.ephemeralExpiration || null - break - } - } else if(content?.reactionMessage) { - const reaction: proto.IReaction = { - ...content.reactionMessage, - key: message.key, - } - const operation = content.reactionMessage?.text ? 'add' : 'remove' - ev.emit( - 'messages.reaction', - { reaction, key: content.reactionMessage!.key!, operation } - ) - } else if(message.messageStubType) { - const meJid = authState.creds.me!.id - const jid = message.key!.remoteJid! - //let actor = whatsappID (message.participant) - let participants: string[] - const emitParticipantsUpdate = (action: ParticipantAction) => ( - ev.emit('group-participants.update', { id: jid, participants, action }) - ) - const emitGroupUpdate = (update: Partial) => { - ev.emit('groups.update', [ { id: jid, ...update } ]) - } - - switch (message.messageStubType) { - case WAMessageStubType.GROUP_PARTICIPANT_LEAVE: - case WAMessageStubType.GROUP_PARTICIPANT_REMOVE: - participants = message.messageStubParameters - emitParticipantsUpdate('remove') - // mark the chat read only if you left the group - if(participants.includes(meJid)) { - chatUpdate.readOnly = true - } - - break - case WAMessageStubType.GROUP_PARTICIPANT_ADD: - case WAMessageStubType.GROUP_PARTICIPANT_INVITE: - case WAMessageStubType.GROUP_PARTICIPANT_ADD_REQUEST_JOIN: - participants = message.messageStubParameters - if(participants.includes(meJid)) { - chatUpdate.readOnly = false - } - - emitParticipantsUpdate('add') - break - case WAMessageStubType.GROUP_CHANGE_ANNOUNCE: - const announceValue = message.messageStubParameters[0] - emitGroupUpdate({ announce: announceValue === 'true' || announceValue === 'on' }) - break - case WAMessageStubType.GROUP_CHANGE_RESTRICT: - const restrictValue = message.messageStubParameters[0] - emitGroupUpdate({ restrict: restrictValue === 'true' || restrictValue === 'on' }) - break - case WAMessageStubType.GROUP_CHANGE_SUBJECT: - chatUpdate.name = message.messageStubParameters[0] - emitGroupUpdate({ subject: chatUpdate.name }) - break - } + const emitEventsFromMap = (map: Partial>) => { + for(const key in map) { + ev.emit(key as any, map[key]) } } + const processMessageLocal = async(message: proto.IWebMessageInfo) => { + const meId = authState.creds.me!.id + // send ack for history message + const isAnyHistoryMsg = !!message.message && !!normalizeMessageContent(message.message)?.protocolMessage?.historySyncNotification + if(isAnyHistoryMsg) { + await sendNode({ + tag: 'receipt', + attrs: { + id: message.key.id, + type: 'hist_sync', + to: jidNormalizedUser(meId) + } + }) + } + + // process message and emit events + const newEvents = await processMessage( + message, + { historyCache, meId, keyStore: authState.keys, logger, treatCiphertextMessagesAsReal } + ) + + if(newEvents['chats.set']?.isLatest) { + resyncMainAppState() + } + + return newEvents + } + const processNotification = (node: BinaryNode): Partial => { const result: Partial = { } const [child] = getAllBinaryNodeChildren(node) @@ -553,8 +452,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const handleUpsertedMessages = async({ messages, type }: BaileysEventMap['messages.upsert']) => { if(type === 'notify' || type === 'append') { - const chatId = jidNormalizedUser(messages[0].key.remoteJid) - const chat: Partial = { id: chatId } const contactNameUpdates: { [_: string]: string } = { } for(const msg of messages) { const normalizedChatId = jidNormalizedUser(msg.key.remoteJid) @@ -569,30 +466,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - await processingMutex.mutex( + const events = await processingMutex.mutex( 'p-' + normalizedChatId, - () => processMessage(msg, chat) + () => processMessageLocal(msg) ) - - const normalizedContent = msg.message ? normalizeMessageContent(msg.message) : undefined - - if( - ( - !!normalizedContent || - (msg.messageStubType === WAMessageStubType.CIPHERTEXT && treatCiphertextMessagesAsReal) - ) - && !normalizedContent?.protocolMessage - && !normalizedContent?.reactionMessage - ) { - chat.conversationTimestamp = toNumber(msg.messageTimestamp) - if(!msg.key.fromMe) { - chat.unreadCount = (chat.unreadCount || 0) + 1 - } - } - } - - if(Object.keys(chat).length > 1) { - ev.emit('chats.update', [ chat ]) + emitEventsFromMap(events) } if(Object.keys(contactNameUpdates).length) { @@ -627,19 +505,4 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { }) return { ...sock, processMessage, sendMessageAck, sendRetryRequest } -} - -const STATUS_MAP: { [_: string]: proto.WebMessageInfo.WebMessageInfoStatus } = { - 'played': proto.WebMessageInfo.WebMessageInfoStatus.PLAYED, - 'read': proto.WebMessageInfo.WebMessageInfoStatus.READ, - 'read-self': proto.WebMessageInfo.WebMessageInfoStatus.READ -} - -const getStatusFromReceiptType = (type: string | undefined) => { - const status = STATUS_MAP[type] - if(typeof type === 'undefined') { - return proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK - } - - return status -} +} \ No newline at end of file diff --git a/src/Utils/generics.ts b/src/Utils/generics.ts index db1bdb3..5fb62a9 100644 --- a/src/Utils/generics.ts +++ b/src/Utils/generics.ts @@ -253,4 +253,19 @@ export const fetchLatestBaileysVersion = async() => { error } } -} \ No newline at end of file +} + +const STATUS_MAP: { [_: string]: proto.WebMessageInfo.WebMessageInfoStatus } = { + 'played': proto.WebMessageInfo.WebMessageInfoStatus.PLAYED, + 'read': proto.WebMessageInfo.WebMessageInfoStatus.READ, + 'read-self': proto.WebMessageInfo.WebMessageInfoStatus.READ +} + +export const getStatusFromReceiptType = (type: string | undefined) => { + const status = STATUS_MAP[type] + if(typeof type === 'undefined') { + return proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK + } + + return status +} diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts new file mode 100644 index 0000000..95afe71 --- /dev/null +++ b/src/Utils/process-message.ts @@ -0,0 +1,167 @@ +import type { Logger } from 'pino' +import { proto } from '../../WAProto' +import { BaileysEventMap, Chat, GroupMetadata, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types' +import { downloadAndProcessHistorySyncNotification, normalizeMessageContent, toNumber } from '../Utils' +import { areJidsSameUser, jidNormalizedUser } from '../WABinary' + +type ProcessMessageContext = { + historyCache: Set, + meId: string, + keyStore: SignalKeyStoreWithTransaction, + logger?: Logger + treatCiphertextMessagesAsReal?: boolean +} + +const processMessage = async( + message: proto.IWebMessageInfo, + { historyCache, meId, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext +) => { + const map: Partial> = { } + + const chat: Partial = { id: jidNormalizedUser(message.key.remoteJid) } + + const normalizedContent = !!message.message && normalizeMessageContent(message.message) + if( + ( + !!normalizedContent || + (message.messageStubType === WAMessageStubType.CIPHERTEXT && treatCiphertextMessagesAsReal) + ) + && !normalizedContent?.protocolMessage + && !normalizedContent?.reactionMessage + ) { + chat.conversationTimestamp = toNumber(message.messageTimestamp) + if(!message.key.fromMe) { + chat.unreadCount = (chat.unreadCount || 0) + 1 + } + } + + const content = normalizeMessageContent(message.message) + const protocolMsg = content?.protocolMessage + if(protocolMsg) { + switch (protocolMsg.type) { + case proto.ProtocolMessage.ProtocolMessageType.HISTORY_SYNC_NOTIFICATION: + const histNotification = protocolMsg!.historySyncNotification + + logger?.info({ histNotification, id: message.key.id }, 'got history notification') + + const { chats, contacts, messages, isLatest } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache) + + if(chats.length) { + map['chats.set'] = { chats, isLatest } + } + + if(messages.length) { + map['messages.set'] = { messages, isLatest } + } + + if(contacts.length) { + map['contacts.set'] = { contacts } + } + + break + case proto.ProtocolMessage.ProtocolMessageType.APP_STATE_SYNC_KEY_SHARE: + const keys = protocolMsg.appStateSyncKeyShare!.keys + if(keys?.length) { + let newAppStateSyncKeyId = '' + await keyStore.transaction( + async() => { + for(const { keyData, keyId } of keys) { + const strKeyId = Buffer.from(keyId.keyId!).toString('base64') + + logger?.info({ strKeyId }, 'injecting new app state sync key') + await keyStore.set({ 'app-state-sync-key': { [strKeyId]: keyData } }) + + newAppStateSyncKeyId = strKeyId + } + } + ) + + map['creds.update'] = { myAppStateKeyId: newAppStateSyncKeyId } + } else { + logger?.info({ protocolMsg }, 'recv app state sync with 0 keys') + } + + break + case proto.ProtocolMessage.ProtocolMessageType.REVOKE: + map['messages.update'] = [ + { + key: { + ...message.key, + id: protocolMsg.key!.id + }, + update: { message: null, messageStubType: WAMessageStubType.REVOKE, key: message.key } + } + ] + break + case proto.ProtocolMessage.ProtocolMessageType.EPHEMERAL_SETTING: + Object.assign(chat, { + ephemeralSettingTimestamp: toNumber(message.messageTimestamp), + ephemeralExpiration: protocolMsg.ephemeralExpiration || null + }) + break + } + } else if(content?.reactionMessage) { + const reaction: proto.IReaction = { + ...content.reactionMessage, + key: message.key, + } + const operation = content.reactionMessage?.text ? 'add' : 'remove' + map['messages.reaction'] = { reaction, key: content.reactionMessage!.key!, operation } + } else if(message.messageStubType) { + const jid = message.key!.remoteJid! + //let actor = whatsappID (message.participant) + let participants: string[] + const emitParticipantsUpdate = (action: ParticipantAction) => ( + map['group-participants.update'] = { id: jid, participants, action } + ) + const emitGroupUpdate = (update: Partial) => { + map['groups.update'] = [ { id: jid, ...update } ] + } + + const participantsIncludesMe = () => participants.find(jid => areJidsSameUser(meId, jid)) + + switch (message.messageStubType) { + case WAMessageStubType.GROUP_PARTICIPANT_LEAVE: + case WAMessageStubType.GROUP_PARTICIPANT_REMOVE: + participants = message.messageStubParameters + emitParticipantsUpdate('remove') + // mark the chat read only if you left the group + if(participantsIncludesMe()) { + chat.readOnly = true + } + + break + case WAMessageStubType.GROUP_PARTICIPANT_ADD: + case WAMessageStubType.GROUP_PARTICIPANT_INVITE: + case WAMessageStubType.GROUP_PARTICIPANT_ADD_REQUEST_JOIN: + participants = message.messageStubParameters + if(participantsIncludesMe()) { + chat.readOnly = false + } + + emitParticipantsUpdate('add') + break + case WAMessageStubType.GROUP_CHANGE_ANNOUNCE: + const announceValue = message.messageStubParameters[0] + emitGroupUpdate({ announce: announceValue === 'true' || announceValue === 'on' }) + break + case WAMessageStubType.GROUP_CHANGE_RESTRICT: + const restrictValue = message.messageStubParameters[0] + emitGroupUpdate({ restrict: restrictValue === 'true' || restrictValue === 'on' }) + break + case WAMessageStubType.GROUP_CHANGE_SUBJECT: + const name = message.messageStubParameters[0] + chat.name = name + emitGroupUpdate({ subject: name }) + break + } + } + + if(Object.keys(chat).length) { + map['chats.update'] = [chat] + } + + return map +} + +export default processMessage \ No newline at end of file