diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 7c602bf..c811dad 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,6 +1,6 @@ import { Boom } from '@hapi/boom' import { proto } from '../../WAProto' -import { AppStateChunk, ChatModification, ChatMutation, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' +import { ALL_WA_PATCH_NAMES, AppStateChunk, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types' import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils' import { makeMutex } from '../Utils/make-mutex' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' @@ -228,7 +228,7 @@ export const makeChatsSocket = (config: SocketConfig) => { }) } - const resyncAppState = async(collections: WAPatchName[]) => { + const resyncAppState = async(collections: readonly WAPatchName[], ctx: InitialReceivedChatsState | undefined) => { const appStateChunk: AppStateChunk = { totalMutations: [], collectionsToHandle: [] } // we use this to determine which events to fire // otherwise when we resync from scratch -- all notifications will fire @@ -344,7 +344,7 @@ export const makeChatsSocket = (config: SocketConfig) => { } ) - processSyncActionsLocal(appStateChunk.totalMutations) + processSyncActionsLocal(appStateChunk.totalMutations, ctx) return appStateChunk } @@ -446,18 +446,12 @@ export const makeChatsSocket = (config: SocketConfig) => { } } - const resyncMainAppState = async() => { + const resyncMainAppState = async(ctx?: InitialReceivedChatsState) => { logger.debug('resyncing main app state') await ( mutationMutex.mutex( - () => resyncAppState([ - 'critical_block', - 'critical_unblock_low', - 'regular_high', - 'regular_low', - 'regular' - ]) + () => resyncAppState(ALL_WA_PATCH_NAMES, ctx) ) .catch(err => ( onUnexpectedError(err, 'main app sync') @@ -465,8 +459,13 @@ export const makeChatsSocket = (config: SocketConfig) => { ) } - const processSyncActionsLocal = (actions: ChatMutation[]) => { - const events = processSyncActions(actions, authState.creds.me!, logger) + const processSyncActionsLocal = (actions: ChatMutation[], recvChats: InitialReceivedChatsState | undefined) => { + const events = processSyncActions( + actions, + authState.creds.me!, + recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined, + logger + ) emitEventsFromMap(events) // resend available presence to update name on servers if(events['creds.update']?.me?.name && markOnlineOnConnect) { @@ -490,7 +489,7 @@ export const makeChatsSocket = (config: SocketConfig) => { async() => { logger.debug({ patch: patchCreate }, 'applying app patch') - await resyncAppState([name]) + await resyncAppState([name], undefined) const { [name]: currentSyncVersion } = await authState.keys.get('app-state-sync-version', [name]) initial = currentSyncVersion || newLTHashState() @@ -544,7 +543,7 @@ export const makeChatsSocket = (config: SocketConfig) => { if(config.emitOwnEvents) { const result = await decodePatches(name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], initial, getAppStateSyncKey) - processSyncActionsLocal(result.newMutations) + processSyncActionsLocal(result.newMutations, undefined) } } @@ -654,7 +653,7 @@ export const makeChatsSocket = (config: SocketConfig) => { if(update) { const name = update.attrs.name as WAPatchName mutationMutex.mutex(() => ( - resyncAppState([name]) + resyncAppState([name], undefined) .catch(err => logger.error({ trace: err.stack, node }, 'failed to sync state')) )) } diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index e4a27b6..712b5f1 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -1,7 +1,7 @@ import { proto } from '../../WAProto' import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults' -import { BaileysEventMap, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types' +import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types' import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils' import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex' import processMessage, { cleanMessage } from '../Utils/process-message' @@ -37,16 +37,28 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { /** this mutex ensures that each retryRequest will wait for the previous one to finish */ const retryMutex = makeMutex() + const historyCache = new Set() + let recvChats: InitialReceivedChatsState = { } + const appStateSyncTimeout = debouncedTimeout( 6_000, - () => ws.readyState === ws.OPEN && resyncMainAppState() + async() => { + logger.info( + { recvChats: Object.keys(recvChats).length }, + 'doing initial app state sync' + ) + if(ws.readyState === ws.OPEN) { + await resyncMainAppState(recvChats) + } + + historyCache.clear() + recvChats = { } + } ) const msgRetryMap = config.msgRetryCounterMap || { } const callOfferData: { [id: string]: WACallEvent } = { } - const historyCache = new Set() - let sendActiveReceipts = false const sendMessageAck = async({ tag, attrs }: BinaryNode, extraAttrs: BinaryNodeAttributes = { }) => { @@ -160,6 +172,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { { downloadHistory, historyCache, + recvChats, creds: authState.creds, keyStore: authState.keys, logger, diff --git a/src/Tests/test.app-state-sync.ts b/src/Tests/test.app-state-sync.ts new file mode 100644 index 0000000..55b5898 --- /dev/null +++ b/src/Tests/test.app-state-sync.ts @@ -0,0 +1,212 @@ +import { AccountSettings, ChatMutation, Contact, InitialAppStateSyncOptions } from '../Types' +import { unixTimestampSeconds } from '../Utils' +import { processSyncActions } from '../Utils/chat-utils' +import logger from '../Utils/logger' +import { jidEncode } from '../WABinary' + +describe('App State Sync Tests', () => { + + const me: Contact = { id: randomJid() } + // case when initial sync is off + it('should return archive=false event', () => { + const jid = randomJid() + const index = ['archive', jid] + + const CASES: ChatMutation[][] = [ + [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: false, + messageRange: { + lastMessageTimestamp: unixTimestampSeconds() + } + } + } + } + } + ], + [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: true, + messageRange: { + lastMessageTimestamp: unixTimestampSeconds() + } + } + } + } + }, + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: false, + messageRange: { + lastMessageTimestamp: unixTimestampSeconds() + } + } + } + } + } + ] + ] + + for(const mutations of CASES) { + const events = processSyncActions(mutations, me, undefined, logger) + expect(events['chats.update']).toHaveLength(1) + const event = events['chats.update']?.[0] + expect(event.archive).toEqual(false) + } + }) + // case when initial sync is on + // and unarchiveChats = true + it('should not fire any archive event', () => { + const jid = randomJid() + const index = ['archive', jid] + const now = unixTimestampSeconds() + + const CASES: ChatMutation[][] = [ + [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: true, + messageRange: { + lastMessageTimestamp: now - 1 + } + } + } + } + } + ], + [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: false, + messageRange: { + lastMessageTimestamp: now + 10 + } + } + } + } + } + ], + [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: true, + messageRange: { + lastMessageTimestamp: now + 10 + } + } + } + } + }, + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: false, + messageRange: { + lastMessageTimestamp: now + 11 + } + } + } + } + } + ], + ] + + const ctx: InitialAppStateSyncOptions = { + recvChats: { + [jid]: { lastMsgRecvTimestamp: now } + }, + accountSettings: { unarchiveChats: true } + } + + for(const mutations of CASES) { + const events = processSyncActions(mutations, me, ctx, logger) + expect(events['chats.update']?.length).toBeFalsy() + } + }) + + // case when initial sync is on + // with unarchiveChats = true & unarchiveChats = false + it('should fire archive=true events', () => { + const jid = randomJid() + const index = ['archive', jid] + const now = unixTimestampSeconds() + + const CASES: { settings: AccountSettings, mutations: ChatMutation[] }[] = [ + { + settings: { unarchiveChats: true }, + mutations: [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: true, + messageRange: { + lastMessageTimestamp: now + } + } + } + } + } + ], + }, + { + settings: { unarchiveChats: false }, + mutations: [ + { + index, + syncAction: { + value: { + archiveChatAction: { + archived: true, + messageRange: { + lastMessageTimestamp: now - 10 + } + } + } + } + } + ], + } + ] + + for(const { mutations, settings } of CASES) { + const ctx: InitialAppStateSyncOptions = { + recvChats: { + [jid]: { lastMsgRecvTimestamp: now } + }, + accountSettings: settings + } + const events = processSyncActions(mutations, me, ctx, logger) + expect(events['chats.update']).toHaveLength(1) + const event = events['chats.update']?.[0] + expect(event.archive).toEqual(true) + } + }) +}) + +function randomJid() { + return jidEncode(Math.floor(Math.random() * 1000000), Math.random() < 0.5 ? 's.whatsapp.net' : 'g.us') +} \ No newline at end of file diff --git a/src/Types/Chat.ts b/src/Types/Chat.ts index 2cf5f85..3381477 100644 --- a/src/Types/Chat.ts +++ b/src/Types/Chat.ts @@ -1,10 +1,19 @@ import type { proto } from '../../WAProto' +import type { AccountSettings } from './Auth' import type { MinimalMessage } from './Message' /** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */ export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused' -export type WAPatchName = 'critical_block' | 'critical_unblock_low' | 'regular_low' | 'regular_high' | 'regular' +export const ALL_WA_PATCH_NAMES = [ + 'critical_block', + 'critical_unblock_low', + 'regular_high', + 'regular_low', + 'regular' +] as const + +export type WAPatchName = typeof ALL_WA_PATCH_NAMES[number] export interface PresenceData { lastKnownPresence: WAPresence @@ -65,4 +74,14 @@ export type ChatModification = markRead: boolean lastMessages: LastMessageList } | - { delete: true, lastMessages: LastMessageList } \ No newline at end of file + { delete: true, lastMessages: LastMessageList } + + +export type InitialReceivedChatsState = { + [jid: string]: { lastMsgRecvTimestamp: number } +} + +export type InitialAppStateSyncOptions = { + recvChats: InitialReceivedChatsState + accountSettings: AccountSettings +} diff --git a/src/Utils/generics.ts b/src/Utils/generics.ts index 4a84016..24b651f 100644 --- a/src/Utils/generics.ts +++ b/src/Utils/generics.ts @@ -86,7 +86,7 @@ export const encodeBigEndian = (e: number, t = 4) => { return a } -export const toNumber = (t: Long | number) => ((typeof t === 'object' && t) ? ('toNumber' in t ? t.toNumber() : (t as any).low) : t) +export const toNumber = (t: Long | number): number => ((typeof t === 'object' && t) ? ('toNumber' in t ? t.toNumber() : (t as any).low) : t) export function shallowChanges (old: T, current: T, { lookForDeletedKeys }: {lookForDeletedKeys: boolean}): Partial { const changes: Partial = {} diff --git a/src/Utils/history.ts b/src/Utils/history.ts index f26a583..1f8ea46 100644 --- a/src/Utils/history.ts +++ b/src/Utils/history.ts @@ -1,8 +1,9 @@ import { promisify } from 'util' import { inflate } from 'zlib' import { proto } from '../../WAProto' -import { Chat, Contact } from '../Types' +import { Chat, Contact, InitialReceivedChatsState } from '../Types' import { isJidUser } from '../WABinary' +import { toNumber } from './generics' import { downloadContentFromMessage } from './messages-media' const inflatePromise = promisify(inflate) @@ -21,7 +22,11 @@ export const downloadHistory = async(msg: proto.IHistorySyncNotification) => { return syncData } -export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Set) => { +export const processHistoryMessage = ( + item: proto.IHistorySync, + historyCache: Set, + recvChats: InitialReceivedChatsState +) => { const messages: proto.IWebMessageInfo[] = [] const contacts: Contact[] = [] const chats: Chat[] = [] @@ -40,6 +45,13 @@ export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Se const uqId = `${message.key.remoteJid}:${message.key.id}` if(!historyCache.has(uqId)) { messages.push(message) + + const curItem = recvChats[message.key.remoteJid] + const timestamp = toNumber(message.messageTimestamp) + if(!message.key.fromMe && (!curItem || timestamp > curItem.lastMsgRecvTimestamp)) { + recvChats[message.key.remoteJid] = { lastMsgRecvTimestamp: timestamp } + } + historyCache.add(uqId) } } @@ -81,7 +93,11 @@ export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Se } } -export const downloadAndProcessHistorySyncNotification = async(msg: proto.IHistorySyncNotification, historyCache: Set) => { +export const downloadAndProcessHistorySyncNotification = async( + msg: proto.IHistorySyncNotification, + historyCache: Set, + recvChats: InitialReceivedChatsState +) => { const historyMsg = await downloadHistory(msg) - return processHistoryMessage(historyMsg, historyCache) + return processHistoryMessage(historyMsg, historyCache, recvChats) } \ No newline at end of file diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index 95135ec..7521001 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -1,11 +1,12 @@ import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AuthenticationCreds, BaileysEventMap, Chat, GroupMetadata, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types' +import { AuthenticationCreds, BaileysEventMap, Chat, GroupMetadata, InitialReceivedChatsState, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types' import { downloadAndProcessHistorySyncNotification, normalizeMessageContent, toNumber } from '../Utils' import { areJidsSameUser, jidNormalizedUser } from '../WABinary' type ProcessMessageContext = { historyCache: Set + recvChats: InitialReceivedChatsState downloadHistory: boolean creds: AuthenticationCreds keyStore: SignalKeyStoreWithTransaction @@ -38,7 +39,7 @@ export const cleanMessage = (message: proto.IWebMessageInfo, meId: string) => { const processMessage = async( message: proto.IWebMessageInfo, - { downloadHistory, historyCache, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext + { downloadHistory, historyCache, recvChats, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext ) => { const meId = creds.me!.id const { accountSettings } = creds @@ -78,7 +79,7 @@ const processMessage = async( logger?.info({ histNotification, id: message.key.id }, 'got history notification') if(downloadHistory) { - const { chats, contacts, messages, didProcess } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache) + const { chats, contacts, messages, didProcess } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache, recvChats) const isLatest = historyCache.size === 0 && !creds.processedHistoryMessages?.length if(chats.length) {