diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index dec3838..eb2d3c9 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' -import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' -import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils' +import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' +import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newLTHashState, processSyncAction } from '../Utils' import { makeMutex } from '../Utils/make-mutex' import processMessage from '../Utils/process-message' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' @@ -273,14 +273,12 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const newAppStateChunkHandler = (collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { - const appStateChunk = newAppStateChunk(collections) + const newAppStateChunkHandler = (recvChats: InitialReceivedChatsState | undefined) => { return { - appStateChunk, onMutation(mutation: ChatMutation) { processSyncAction( mutation, - appStateChunk.updates, + ev, authState.creds.me, recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined, logger @@ -290,7 +288,8 @@ export const makeChatsSocket = (config: SocketConfig) => { } const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { - const { appStateChunk, onMutation } = newAppStateChunkHandler(collections, recvChats) + const startedBuffer = ev.buffer() + const { onMutation } = newAppStateChunkHandler(recvChats) // we use this to determine which events to fire // otherwise when we resync from scratch -- all notifications will fire const initialVersionMap: { [T in WAPatchName]?: number } = { } @@ -402,9 +401,10 @@ export const makeChatsSocket = (config: SocketConfig) => { } ) - processSyncActionsLocal(appStateChunk.updates) - - return appStateChunk + // flush everything if we started the buffer here + if(startedBuffer) { + await ev.flush() + } } /** @@ -517,14 +517,6 @@ export const makeChatsSocket = (config: SocketConfig) => { ) } - const processSyncActionsLocal = (actions: SyncActionUpdates) => { - emitEventsFromMap(syncActionUpdatesToEventMap(actions)) - // resend available presence to update name on servers - if(actions.credsUpdates.me?.name && markOnlineOnConnect) { - sendPresenceUpdate('available') - } - } - const appPatch = async(patchCreate: WAPatchCreate) => { const name = patchCreate.type const myAppStateKeyId = authState.creds.myAppStateKeyId @@ -594,7 +586,7 @@ export const makeChatsSocket = (config: SocketConfig) => { ) if(config.emitOwnEvents) { - const { appStateChunk, onMutation } = newAppStateChunkHandler([name], undefined) + const { onMutation } = newAppStateChunkHandler(undefined) await decodePatches( name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], @@ -604,7 +596,6 @@ export const makeChatsSocket = (config: SocketConfig) => { undefined, logger, ) - processSyncActionsLocal(appStateChunk.updates) } } diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index 86f203d..4f34e7e 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -529,7 +529,7 @@ export const makeSocket = ({ ev.on('creds.update', update => { const name = update.me?.name // if name has just been received - if(!creds.me?.name && name) { + if(creds.me?.name !== name) { logger.info({ name }, 'updated pushName') sendNode({ tag: 'presence', diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index f51f86f..def05c1 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -1,7 +1,6 @@ import type { proto } from '../../WAProto' -import type { AccountSettings, AuthenticationCreds } from './Auth' -import { Contact } from './Contact' -import type { MinimalMessage, WAMessageUpdate } from './Message' +import type { AccountSettings } from './Auth' +import type { MinimalMessage } from './Message' /** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */ export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused' @@ -26,20 +25,6 @@ export type ChatMutation = { index: string[] } -export type SyncActionUpdates = { - credsUpdates: Partial - chatUpdates: { [jid: string]: Partial } - chatDeletes: string[] - contactUpserts: { [jid: string]: Contact } - msgUpdates: { [jid: string]: WAMessageUpdate } - msgDeletes: proto.IMessageKey[] -} - -export type AppStateChunk = { - updates: SyncActionUpdates - collectionsToHandle: WAPatchName[] -} - export type WAPatchCreate = { syncAction: proto.ISyncActionValue index: string[] diff --git a/src/Utils/chat-utils.ts b/src/Utils/chat-utils.ts index 3b8cd46..d026361 100644 --- a/src/Utils/chat-utils.ts +++ b/src/Utils/chat-utils.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AppStateChunk, AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, SyncActionUpdates, WAPatchCreate, WAPatchName } from '../Types' +import { BaileysEventEmitter, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, jidNormalizedUser } from '../WABinary' import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto' import { toNumber } from './generics' @@ -115,18 +115,6 @@ const generatePatchMac = (snapshotMac: Uint8Array, valueMacs: Uint8Array[], vers export const newLTHashState = (): LTHashState => ({ version: 0, hash: Buffer.alloc(128), indexValueMap: {} }) -export const newAppStateChunk = (collectionsToHandle: readonly WAPatchName[]): AppStateChunk => ({ - updates: { - chatUpdates: { }, - credsUpdates: { }, - chatDeletes: [], - contactUpserts: { }, - msgDeletes: [], - msgUpdates: { } - }, - collectionsToHandle: [...collectionsToHandle], -}) - export const encodeSyncdPatch = async( { type, index, syncAction, apiVersion, operation }: WAPatchCreate, myAppStateKeyId: string, @@ -602,7 +590,7 @@ export const chatModificationToAppPatch = ( export const processSyncAction = ( syncAction: ChatMutation, - { credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates, + ev: BaileysEventEmitter, me: Contact, initialSyncOpts?: InitialAppStateSyncOptions, logger?: Logger, @@ -612,11 +600,18 @@ export const processSyncAction = ( const accountSettings = initialSyncOpts?.accountSettings const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction - const update: Partial = { id } if(action?.muteAction) { - update.mute = action.muteAction?.muted ? - toNumber(action.muteAction!.muteEndTimestamp!) : - undefined + ev.emit( + 'chats.update', + [ + { + id, + mute: action.muteAction?.muted ? + toNumber(action.muteAction!.muteEndTimestamp!) : + undefined + } + ] + ) } else if(action?.archiveChatAction) { // okay so we've to do some annoying computation here // when we're initially syncing the app state @@ -637,9 +632,9 @@ export const processSyncAction = ( // basically we don't need to fire an "archive" update if the chat is being marked unarchvied // this only applies for the initial sync if(isInitialSync && !archiveAction.archived) { - delete update.archive + ev.emit('chats.update', [{ id, archive: false }]) } else { - update.archive = !!archiveAction?.archived + ev.emit('chats.update', [{ id, archive: !!archiveAction?.archived }]) } } } else if(action?.markChatAsReadAction) { @@ -652,97 +647,50 @@ export const processSyncAction = ( // because the chat is read by default // this only applies for the initial sync if(isInitialSync && markReadAction.read) { - delete update.unreadCount + ev.emit('chats.update', [{ id, unreadCount: null }]) } else { - update.unreadCount = !!markReadAction?.read ? 0 : -1 + ev.emit('chats.update', [{ id, unreadCount: !!markReadAction?.read ? 0 : -1 }]) } } } else if(action?.clearChatAction) { - msgDeletes.push({ - remoteJid: id, - id: msgId, - fromMe: fromMe === '1' - }) + ev.emit('messages.delete', { keys: [ + { + remoteJid: id, + id: msgId, + fromMe: fromMe === '1' + } + ] }) } else if(action?.contactAction) { - contactUpserts[id] = { - ...(contactUpserts[id] || {}), - id, - name: action.contactAction!.fullName - } + ev.emit('contacts.upsert', [{ id, name: action.contactAction!.fullName }]) } else if(action?.pushNameSetting) { if(me?.name !== action?.pushNameSetting) { - credsUpdates.me = { ...me, name: action?.pushNameSetting?.name! } + ev.emit('creds.update', { me: { ...me, name: action?.pushNameSetting?.name! } }) } } else if(action?.pinAction) { - update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null + ev.emit('chats.update', [{ id, pin: action.pinAction?.pinned ? toNumber(action.timestamp) : null }]) } else if(action?.unarchiveChatsSetting) { const unarchiveChats = !!action.unarchiveChatsSetting.unarchiveChats - credsUpdates.accountSettings = { unarchiveChats } + ev.emit('creds.update', { accountSettings: { unarchiveChats } }) logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`) accountSettings.unarchiveChats = unarchiveChats } else if(action?.starAction) { - const uqId = `${id},${msgId}` - const update = msgUpdates[uqId] || { - key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' }, - update: { } - } - - update.update.starred = !!action.starAction?.starred - - msgUpdates[uqId] = update + ev.emit('messages.update', [ + { + key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' }, + update: { starred: !!action.starAction?.starred } + } + ]) } else if(action?.deleteChatAction) { - chatDeletes.push(id) + ev.emit('chats.delete', [id]) } else { logger.warn({ syncAction, id }, 'unprocessable update') } - if(Object.keys(update).length > 1) { - chatUpdates[update.id] = { - ...(chatUpdates[update.id] || {}), - ...update - } - } else if(chatUpdates[update.id]) { - // remove if the update got cancelled - logger?.debug({ id: update.id }, 'cancelling update') - delete chatUpdates[update.id] - } - function isValidPatchBasedOnMessageRange(id: string, msgRange: proto.ISyncActionMessageRange) { const chat = recvChats?.[id] const lastMsgTimestamp = msgRange.lastMessageTimestamp || msgRange.lastSystemMessageTimestamp || 0 const chatLastMsgTimestamp = chat?.lastMsgRecvTimestamp || 0 return lastMsgTimestamp >= chatLastMsgTimestamp } -} - -export const syncActionUpdatesToEventMap = ( - { credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates, -) => { - const map: Partial> = { } - if(Object.keys(credsUpdates).length) { - map['creds.update'] = credsUpdates - } - - if(Object.values(chatUpdates).length) { - map['chats.update'] = Object.values(chatUpdates) - } - - if(chatDeletes.length) { - map['chats.delete'] = chatDeletes - } - - if(Object.values(contactUpserts).length) { - map['contacts.upsert'] = Object.values(contactUpserts) - } - - if(msgDeletes.length) { - map['messages.delete'] = { keys: msgDeletes } - } - - if(Object.keys(msgUpdates).length) { - map['messages.update'] = Object.values(msgUpdates) - } - - return map -} +} \ No newline at end of file diff --git a/src/Utils/event-buffer.ts b/src/Utils/event-buffer.ts index afc1c89..5e4bee8 100644 --- a/src/Utils/event-buffer.ts +++ b/src/Utils/event-buffer.ts @@ -23,8 +23,11 @@ type BufferableEvent = typeof BUFFERABLE_EVENT[number] const BUFFERABLE_EVENT_SET = new Set(BUFFERABLE_EVENT) type BaileysBufferableEventEmitter = BaileysEventEmitter & { - /** starts buffering events, call flush() to release them */ - buffer(): void + /** + * starts buffering events, call flush() to release them + * @returns true if buffering just started, false if it was already buffering + * */ + buffer(): boolean /** flushes all buffered events */ flush(): Promise /** waits for the task to complete, before releasing the buffer */ @@ -61,8 +64,11 @@ export const makeEventBuffer = ( } }, buffer() { - logger.trace('buffering events') - isBuffering = true + if(!isBuffering) { + logger.trace('buffering events') + isBuffering = true + return true + } }, async flush() { if(!isBuffering) { @@ -349,6 +355,14 @@ function flush(data: BufferedEventData, ev: BaileysEventEmitter) { } function concatChats>(a: C, b: C) { + if(b.unreadCount === null) { + // neutralize unread counter + if(a.unreadCount < 0) { + a.unreadCount = undefined + b.unreadCount = undefined + } + } + if(typeof a.unreadCount !== 'undefined' && typeof b.unreadCount !== 'undefined') { b = { ...b } if(b.unreadCount >= 0) {