diff --git a/Example/example.ts b/Example/example.ts index 4ac0e47..9aec162 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -56,46 +56,80 @@ const startSock = async() => { await sock.sendMessage(jid, msg) } - sock.ev.on('call', item => console.log('recv call event', item)) - sock.ev.on('chats.set', item => console.log(`recv ${item.chats.length} chats (is latest: ${item.isLatest})`)) - sock.ev.on('messages.set', item => console.log(`recv ${item.messages.length} messages (is latest: ${item.isLatest})`)) - sock.ev.on('contacts.set', item => console.log(`recv ${item.contacts.length} contacts`)) + // the process function lets you process all events that just occurred + // efficiently in a batch + sock.ev.process( + // events is a map for event name => event data + async(events) => { + if(events['connection.update']) { + const update = events['connection.update'] + const { connection, lastDisconnect } = update + if(connection === 'close') { + // reconnect if not logged out + if((lastDisconnect?.error as Boom)?.output?.statusCode !== DisconnectReason.loggedOut) { + startSock() + } else { + console.log('Connection closed. You are logged out.') + } + } - sock.ev.on('messages.upsert', async m => { - console.log(JSON.stringify(m, undefined, 2)) + console.log('connection update', update) + } - const msg = m.messages[0] - if(!msg.key.fromMe && m.type === 'notify' && doReplies) { - console.log('replying to', m.messages[0].key.remoteJid) - await sock!.sendReadReceipt(msg.key.remoteJid, msg.key.participant, [msg.key.id]) - await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid) - } + if(events['creds.update']) { + await saveCreds() + } - }) + if(events.call) { + console.log('recv call event', events.call) + } - sock.ev.on('messages.update', m => console.log(m)) - sock.ev.on('message-receipt.update', m => console.log(m)) - sock.ev.on('messages.reaction', m => console.log(m)) - sock.ev.on('presence.update', m => console.log(m)) - sock.ev.on('chats.update', m => console.log(m)) - sock.ev.on('chats.delete', m => console.log(m)) - sock.ev.on('contacts.upsert', m => console.log(m)) + if(events['chats.set']) { + const { chats, isLatest } = events['chats.set'] + console.log(`recv ${chats.length} chats (is latest: ${isLatest})`) + } - sock.ev.on('connection.update', (update) => { - const { connection, lastDisconnect } = update - if(connection === 'close') { - // reconnect if not logged out - if((lastDisconnect.error as Boom)?.output?.statusCode !== DisconnectReason.loggedOut) { - startSock() - } else { - console.log('Connection closed. You are logged out.') + if(events['messages.set']) { + const { messages, isLatest } = events['messages.set'] + console.log(`recv ${messages.length} messages (is latest: ${isLatest})`) + } + + if(events['contacts.set']) { + const { contacts, isLatest } = events['contacts.set'] + console.log(`recv ${contacts.length} contacts (is latest: ${isLatest})`) + } + + if(events['messages.upsert'] && events['messages.upsert'].type === 'notify') { + for(const msg of events['messages.upsert'].messages) { + if(!msg.key.fromMe && doReplies) { + console.log('replying to', msg.key.remoteJid) + await sock!.sendReadReceipt(msg.key.remoteJid!, msg.key.participant!, [msg.key.id!]) + await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!) + } + } + } + + if(events['messages.update']) { + console.log(events['messages.update']) + } + + if(events['message-receipt.update']) { + console.log(events['message-receipt.update']) + } + + if(events['messages.reaction']) { + console.log(events['messages.reaction']) + } + + if(events['presence.update']) { + console.log(events['presence.update']) + } + + if(events['chats.update']) { + console.log(events['chats.update']) } } - - console.log('connection update', update) - }) - // listen for when the auth credentials is updated - sock.ev.on('creds.update', saveCreds) + ) return sock } diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 0942951..d6bea71 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -22,12 +22,10 @@ export const makeChatsSocket = (config: SocketConfig) => { sendNode, query, onUnexpectedError, - emitEventsFromMap, } = sock let privacySettings: { [_: string]: string } | undefined - const mutationMutex = makeMutex() /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ const processingMutex = makeMutex() /** cache to ensure new history sync events do not have duplicate items */ @@ -527,7 +525,7 @@ export const makeChatsSocket = (config: SocketConfig) => { logger.debug('resyncing main app state') await ( - mutationMutex.mutex( + processingMutex.mutex( () => resyncAppState(ALL_WA_PATCH_NAMES, ctx) ) .catch(err => ( @@ -546,7 +544,7 @@ export const makeChatsSocket = (config: SocketConfig) => { let initial: LTHashState let encodeResult: { patch: proto.ISyncdPatch, state: LTHashState } - await mutationMutex.mutex( + await processingMutex.mutex( async() => { await authState.keys.transaction( async() => { @@ -694,12 +692,30 @@ export const makeChatsSocket = (config: SocketConfig) => { ]) } - const processMessageLocal = async(msg: proto.IWebMessageInfo) => { + const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => { + const startedBuffer = ev.buffer() + ev.emit('messages.upsert', { messages: [msg], type }) + + if(!!msg.pushName) { + let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid) + jid = jidNormalizedUser(jid) + + if(!msg.key.fromMe) { + ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }]) + } + + // update our pushname too + if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) { + ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } }) + } + } + // process message and emit events - const newEvents = await processMessage( + await processMessage( msg, { downloadHistory, + ev, historyCache, recvChats, creds: authState.creds, @@ -717,28 +733,9 @@ export const makeChatsSocket = (config: SocketConfig) => { appStateSyncTimeout.start() } - return newEvents - } - - const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => { - ev.emit('messages.upsert', { messages: [msg], type }) - - if(!!msg.pushName) { - let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid) - jid = jidNormalizedUser(jid) - - if(!msg.key.fromMe) { - ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }]) - } - - // update our pushname too - if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) { - ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } }) - } + if(startedBuffer) { + await ev.flush() } - - const events = await processMessageLocal(msg) - emitEventsFromMap(events) } ws.on('CB:presence', handlePresenceUpdate) @@ -777,7 +774,6 @@ export const makeChatsSocket = (config: SocketConfig) => { return { ...sock, - mutationMutex, processingMutex, fetchPrivacySettings, upsertMessage, diff --git a/src/Socket/groups.ts b/src/Socket/groups.ts index 87506bc..c86859d 100644 --- a/src/Socket/groups.ts +++ b/src/Socket/groups.ts @@ -155,6 +155,8 @@ export const makeGroupsSocket = (config: SocketConfig) => { admin: key.remoteJid! } }]) + + const started = ev.buffer() // if we have the full message key // update the invite message to be expired if(key.id) { @@ -193,6 +195,10 @@ export const makeGroupsSocket = (config: SocketConfig) => { 'notify' ) + if(started) { + await ev.flush() + } + return results.attrs.from }, groupGetInviteInfo: async(code: string) => { diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 278be3f..ac03a14 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -20,7 +20,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { ev, authState, ws, - mutationMutex, processingMutex, upsertMessage, resyncAppState, @@ -249,7 +248,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const update = getBinaryNodeChild(node, 'collection') if(update) { const name = update.attrs.name as WAPatchName - await mutationMutex.mutex(() => resyncAppState([name], undefined)) + await resyncAppState([name], undefined) } } @@ -591,10 +590,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } const protoMsg = proto.WebMessageInfo.fromObject(msg) - ev.emit( - 'messages.upsert', - { messages: [protoMsg], type: call.offline ? 'append' : 'notify' } - ) + upsertMessage(protoMsg, call.offline ? 'append' : 'notify') } }) diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index e622592..91abcce 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -1,10 +1,9 @@ import { Boom } from '@hapi/boom' -import EventEmitter from 'events' import { promisify } from 'util' import WebSocket from 'ws' import { proto } from '../../WAProto' import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, INITIAL_PREKEY_COUNT, MIN_PREKEY_COUNT } from '../Defaults' -import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types' +import { DisconnectReason, SocketConfig } from '../Types' import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, generateLoginNode, generateMdTagPrefix, generateRegistrationNode, getCodeFromWSError, getErrorCodeFromStreamError, getNextPreKeysNode, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout } from '../Utils' import { makeEventBuffer } from '../Utils/event-buffer' import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, getBinaryNodeChildren, S_WHATSAPP_NET } from '../WABinary' @@ -35,8 +34,7 @@ export const makeSocket = ({ agent }) ws.setMaxListeners(0) - const _ev = new EventEmitter() as BaileysEventEmitter - const ev = makeEventBuffer(_ev, logger) + const ev = makeEventBuffer(logger) /** ephemeral key pair used to encrypt/decrypt communication. Unique for each connection */ const ephemeralKeyPair = Curve.generateKeyPair() /** WA noise protocol wrapper */ @@ -386,12 +384,6 @@ export const makeSocket = ({ }) ) - const emitEventsFromMap = (map: Partial>) => { - for(const key in map) { - ev.emit(key as any, map[key]) - } - } - /** logout & invalidate connection */ const logout = async() => { const jid = authState.creds.me?.id @@ -555,7 +547,6 @@ export const makeSocket = ({ get user() { return authState.creds.me }, - emitEventsFromMap, generateMessageTag, query, waitForMessage, diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index 5e4bee8..7852a94 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'events' import { Logger } from 'pino' import { proto } from '../../WAProto' import { AuthenticationCreds, BaileysEvent, BaileysEventEmitter, BaileysEventMap, BufferedEventData, Chat, Contact, WAMessage, WAMessageStatus } from '../Types' @@ -20,9 +21,20 @@ const BUFFERABLE_EVENT = [ type BufferableEvent = typeof BUFFERABLE_EVENT[number] +/** + * A map that contains a list of all events that have been triggered + * + * Note, this can contain different type of events + * this can make processing events extremely efficient -- since everything + * can be done in a single transaction + */ +type BaileysEventData = Partial> + const BUFFERABLE_EVENT_SET = new Set(BUFFERABLE_EVENT) type BaileysBufferableEventEmitter = BaileysEventEmitter & { + /** Use to process events in a batch */ + process(handler: (events: BaileysEventData) => void | Promise): (() => void) /** * starts buffering events, call flush() to release them * @returns true if buffering just started, false if it was already buffering @@ -39,24 +51,39 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & { * making the data processing more efficient. * @param ev the baileys event emitter */ -export const makeEventBuffer = ( - ev: BaileysEventEmitter, - logger: Logger -): BaileysBufferableEventEmitter => { +export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter => { + const ev = new EventEmitter() let data = makeBufferData() let isBuffering = false let preBufferTask: Promise = Promise.resolve() + // take the generic event and fire it as a baileys event + ev.on('event', (map: BaileysEventData) => { + for(const event in map) { + ev.emit(event, map[event]) + } + }) + return { + process(handler) { + const listener = (map: BaileysEventData) => { + handler(map) + } + + ev.on('event', listener) + return () => { + ev.off('event', listener) + } + }, emit(event: BaileysEvent, evData: BaileysEventMap[T]) { if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) { append(data, event as any, evData, logger) return true } - return ev.emit(event, evData) + return ev.emit('event', { [event]: evData }) }, processInBuffer(task) { if(isBuffering) { @@ -79,7 +106,7 @@ export const makeEventBuffer = ( await preBufferTask isBuffering = false - flush(data, ev) + ev.emit('event', consolidateEvents(data)) data = makeBufferData() logger.trace('released buffered events') @@ -300,58 +327,73 @@ function append( } } -function flush(data: BufferedEventData, ev: BaileysEventEmitter) { +function consolidateEvents(data: BufferedEventData) { + const map: BaileysEventData = { } + const chatUpsertList = Object.values(data.chatUpserts) - chatUpsertList.length && ev.emit('chats.upsert', chatUpsertList) + if(chatUpsertList.length) { + map['chats.upsert'] = chatUpsertList + } const chatUpdateList = Object.values(data.chatUpdates) - chatUpdateList.length && ev.emit('chats.update', chatUpdateList) + if(chatUpdateList.length) { + map['chats.update'] = chatUpdateList + } const chatDeleteList = Array.from(data.chatDeletes) - chatDeleteList.length && ev.emit('chats.delete', chatDeleteList) + if(chatDeleteList.length) { + map['chats.delete'] = chatDeleteList + } const messageUpsertList = Object.values(data.messageUpserts) if(messageUpsertList.length) { - const appends: WAMessage[] = [] - const notifys: WAMessage[] = [] - for(const { message, type } of messageUpsertList) { - const arr = type === 'append' ? appends : notifys - arr.push(message) - } - - if(appends.length) { - ev.emit('messages.upsert', { type: 'append', messages: appends }) - } - - if(notifys.length) { - ev.emit('messages.upsert', { type: 'notify', messages: notifys }) + const type = messageUpsertList[0].type + map['messages.upsert'] = { + messages: messageUpsertList.map(m => m.message), + type } } const messageUpdateList = Object.values(data.messageUpdates) - messageUpdateList.length && ev.emit('messages.update', messageUpdateList) + if(messageUpdateList.length) { + map['messages.update'] = messageUpdateList + } const messageDeleteList = Object.values(data.messageDeletes) - messageDeleteList.length && ev.emit('messages.delete', { keys: messageDeleteList }) + if(messageDeleteList.length) { + map['messages.delete'] = { keys: messageDeleteList } + } const messageReactionList = Object.values(data.messageReactions).flatMap( ({ key, reactions }) => reactions.flatMap(reaction => ({ key, reaction })) ) - messageReactionList.length && ev.emit('messages.reaction', messageReactionList) + if(messageReactionList.length) { + map['messages.reaction'] = messageReactionList + } const messageReceiptList = Object.values(data.messageReceipts).flatMap( ({ key, userReceipt }) => userReceipt.flatMap(receipt => ({ key, receipt })) ) - messageReceiptList.length && ev.emit('message-receipt.update', messageReceiptList) + if(messageReceiptList.length) { + map['message-receipt.update'] = messageReceiptList + } const contactUpsertList = Object.values(data.contactUpserts) - contactUpsertList.length && ev.emit('contacts.upsert', contactUpsertList) + if(contactUpsertList.length) { + map['contacts.upsert'] = contactUpsertList + } const contactUpdateList = Object.values(data.contactUpdates) - contactUpdateList.length && ev.emit('contacts.update', contactUpdateList) + if(contactUpdateList.length) { + map['contacts.update'] = contactUpdateList + } const groupUpdateList = Object.values(data.groupUpdates) - groupUpdateList.length && ev.emit('groups.update', groupUpdateList) + if(groupUpdateList.length) { + map['groups.update'] = groupUpdateList + } + + return map } function concatChats>(a: C, b: C) { diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index 6f61248..9df4066 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -1,6 +1,6 @@ import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AuthenticationCreds, BaileysEventMap, Chat, GroupMetadata, InitialReceivedChatsState, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types' +import { AuthenticationCreds, BaileysEventEmitter, Chat, GroupMetadata, InitialReceivedChatsState, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types' import { downloadAndProcessHistorySyncNotification, normalizeMessageContent, toNumber } from '../Utils' import { areJidsSameUser, jidNormalizedUser } from '../WABinary' @@ -10,6 +10,7 @@ type ProcessMessageContext = { downloadHistory: boolean creds: AuthenticationCreds keyStore: SignalKeyStoreWithTransaction + ev: BaileysEventEmitter logger?: Logger treatCiphertextMessagesAsReal?: boolean } @@ -55,11 +56,10 @@ export const shouldIncrementChatUnread = (message: proto.IWebMessageInfo) => ( const processMessage = async( message: proto.IWebMessageInfo, - { downloadHistory, historyCache, recvChats, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext + { downloadHistory, ev, historyCache, recvChats, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext ) => { const meId = creds.me!.id const { accountSettings } = creds - const map: Partial> = { } const chat: Partial = { id: jidNormalizedUser(message.key.remoteJid) } @@ -90,25 +90,24 @@ const processMessage = async( const isLatest = historyCache.size === 0 && !creds.processedHistoryMessages?.length if(chats.length) { - map['chats.set'] = { chats, isLatest } + ev.emit('chats.set', { chats, isLatest }) } if(messages.length) { - map['messages.set'] = { messages, isLatest } + ev.emit('messages.set', { messages, isLatest }) } if(contacts.length) { - map['contacts.set'] = { contacts, isLatest } + ev.emit('contacts.set', { contacts, isLatest }) } if(didProcess) { - map['creds.update'] = { - ...(map['creds.update'] || {}), + ev.emit('creds.update', { processedHistoryMessages: [ ...(creds.processedHistoryMessages || []), - { key: message.key, timestamp: message.messageTimestamp } + { key: message.key, messageTimestamp: message.messageTimestamp } ] - } + }) } } @@ -130,14 +129,14 @@ const processMessage = async( } ) - map['creds.update'] = { myAppStateKeyId: newAppStateSyncKeyId } + ev.emit('creds.update', { myAppStateKeyId: newAppStateSyncKeyId }) } else { logger?.info({ protocolMsg }, 'recv app state sync with 0 keys') } break case proto.ProtocolMessage.ProtocolMessageType.REVOKE: - map['messages.update'] = [ + ev.emit('messages.update', [ { key: { ...message.key, @@ -145,7 +144,7 @@ const processMessage = async( }, update: { message: null, messageStubType: WAMessageStubType.REVOKE, key: message.key } } - ] + ]) break case proto.ProtocolMessage.ProtocolMessageType.EPHEMERAL_SETTING: Object.assign(chat, { @@ -159,19 +158,19 @@ const processMessage = async( ...content.reactionMessage, key: message.key, } - map['messages.reaction'] = [{ + ev.emit('messages.reaction', [{ reaction, key: content.reactionMessage!.key!, - }] + }]) } else if(message.messageStubType) { const jid = message.key!.remoteJid! //let actor = whatsappID (message.participant) let participants: string[] const emitParticipantsUpdate = (action: ParticipantAction) => ( - map['group-participants.update'] = { id: jid, participants, action } + ev.emit('group-participants.update', { id: jid, participants, action }) ) const emitGroupUpdate = (update: Partial) => { - map['groups.update'] = [ { id: jid, ...update } ] + ev.emit('groups.update', [{ id: jid, ...update }]) } const participantsIncludesMe = () => participants.find(jid => areJidsSameUser(meId, jid)) @@ -222,10 +221,8 @@ const processMessage = async( } if(Object.keys(chat).length > 1) { - map['chats.update'] = [chat] + ev.emit('chats.update', [chat]) } - - return map } export default processMessage \ No newline at end of file