refactor!: cleaner message history sync

This is a breaking change,
1. three events (chats.set, contacts.set, messages.set) are now just one `messaging-history.set` event
2. no need to debounce for app state sync
3. added a new "conditional" chat update to allow for correct app state sync despite not having the chat available on hand
This commit is contained in:
Adhiraj Singh
2022-09-29 16:32:57 +05:30
parent e08dd10198
commit d0330d1863
16 changed files with 600 additions and 309 deletions

View File

@@ -1,7 +1,8 @@
import { Boom } from '@hapi/boom'
import { proto } from '../../WAProto'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
import processMessage from '../Utils/process-message'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
@@ -9,10 +10,8 @@ import { makeSocket } from './socket'
const MAX_SYNC_ATTEMPTS = 5
const APP_STATE_SYNC_TIMEOUT_MS = 10_000
export const makeChatsSocket = (config: SocketConfig) => {
const { logger, markOnlineOnConnect, downloadHistory, fireInitQueries } = config
const { logger, markOnlineOnConnect, shouldSyncHistoryMessage, fireInitQueries } = config
const sock = makeSocket(config)
const {
ev,
@@ -26,40 +25,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
} = sock
let privacySettings: { [_: string]: string } | undefined
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
const processingMutex = makeMutex()
/** cache to ensure new history sync events do not have duplicate items */
const historyCache = new Set<string>()
let recvChats: InitialReceivedChatsState = { }
const appStateSyncTimeout = debouncedTimeout(
APP_STATE_SYNC_TIMEOUT_MS,
async() => {
if(!authState.creds.myAppStateKeyId) {
logger.warn('myAppStateKeyId not synced, bad link')
await logout('Incomplete app state key sync')
return
}
if(ws.readyState === ws.OPEN) {
logger.info(
{ recvChats: Object.keys(recvChats).length },
'doing initial app state sync'
)
await resyncMainAppState(recvChats)
const accountSyncCounter = (authState.creds.accountSyncCounter || 0) + 1
ev.emit('creds.update', { accountSyncCounter })
} else {
logger.warn('connection closed before app state sync')
}
historyCache.clear()
recvChats = { }
}
)
/** helper function to fetch the given app state sync key */
const getAppStateSyncKey = async(keyId: string) => {
@@ -310,22 +277,22 @@ export const makeChatsSocket = (config: SocketConfig) => {
})
}
const newAppStateChunkHandler = (recvChats: InitialReceivedChatsState | undefined) => {
const newAppStateChunkHandler = (isInitialSync: boolean) => {
return {
onMutation(mutation: ChatMutation) {
processSyncAction(
mutation,
ev,
authState.creds.me!,
recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined,
isInitialSync ? { accountSettings: authState.creds.accountSettings } : undefined,
logger
)
}
}
}
const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => {
const { onMutation } = newAppStateChunkHandler(recvChats)
const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], isInitialSync: boolean) => {
const { onMutation } = newAppStateChunkHandler(isInitialSync)
// we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire
const initialVersionMap: { [T in WAPatchName]?: number } = { }
@@ -543,19 +510,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
}
const resyncMainAppState = async(ctx?: InitialReceivedChatsState) => {
logger.debug('resyncing main app state')
await (
processingMutex.mutex(
() => resyncAppState(ALL_WA_PATCH_NAMES, ctx)
)
.catch(err => (
onUnexpectedError(err, 'main app sync')
))
)
}
const appPatch = async(patchCreate: WAPatchCreate) => {
const name = patchCreate.type
const myAppStateKeyId = authState.creds.myAppStateKeyId
@@ -572,7 +526,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
async() => {
logger.debug({ patch: patchCreate }, 'applying app patch')
await resyncAppState([name], undefined)
await resyncAppState([name], false)
const { [name]: currentSyncVersion } = await authState.keys.get('app-state-sync-version', [name])
initial = currentSyncVersion || newLTHashState()
@@ -625,7 +579,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
)
if(config.emitOwnEvents) {
const { onMutation } = newAppStateChunkHandler(undefined)
const { onMutation } = newAppStateChunkHandler(false)
await decodePatches(
name,
[{ ...encodeResult!.patch, version: { version: encodeResult!.state.version }, }],
@@ -726,33 +680,49 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
// update our pushname too
if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) {
if(msg.key.fromMe && msg.pushName && authState.creds.me?.name !== msg.pushName) {
ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } })
}
}
// process message and emit events
await processMessage(
msg,
{
downloadHistory,
ev,
historyCache,
recvChats,
creds: authState.creds,
keyStore: authState.keys,
logger,
options: config.options,
const historyMsg = getHistoryMsg(msg.message!)
const shouldProcessHistoryMsg = historyMsg
? (
shouldSyncHistoryMessage(historyMsg)
&& PROCESSABLE_HISTORY_TYPES.includes(historyMsg.syncType!)
)
: false
// we should have app state keys before we process any history
if(shouldProcessHistoryMsg) {
if(!authState.creds.myAppStateKeyId) {
logger.warn('myAppStateKeyId not synced, bad link')
await logout('Incomplete app state key sync')
return
}
)
const isAnyHistoryMsg = isHistoryMsg(msg.message!)
if(isAnyHistoryMsg) {
// we only want to sync app state once we've all the history
// restart the app state sync timeout
logger.debug('restarting app sync timeout')
appStateSyncTimeout.start()
}
await Promise.all([
(async() => {
if(shouldProcessHistoryMsg && !authState.creds.accountSyncCounter) {
logger.info('doing initial app state sync')
await resyncAppState(ALL_WA_PATCH_NAMES, true)
const accountSyncCounter = (authState.creds.accountSyncCounter || 0) + 1
ev.emit('creds.update', { accountSyncCounter })
}
})(),
processMessage(
msg,
{
shouldProcessHistoryMsg,
ev,
creds: authState.creds,
keyStore: authState.keys,
logger,
options: config.options,
}
)
])
})
ws.on('CB:presence', handlePresenceUpdate)
@@ -814,7 +784,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
updateBlockStatus,
getBusinessProfile,
resyncAppState,
chatModify,
resyncMainAppState,
chatModify
}
}

View File

@@ -2,7 +2,7 @@
import { proto } from '../../WAProto'
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
import { MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType, WAPatchName } from '../Types'
import { decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, encodeSignedDeviceIdentity, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, isHistoryMsg, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, encodeSignedDeviceIdentity, getCallStatusFromNode, getHistoryMsg, getNextPreKeys, getStatusFromReceiptType, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
import { cleanMessage } from '../Utils/process-message'
import { areJidsSameUser, BinaryNode, getAllBinaryNodeChildren, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary'
@@ -286,7 +286,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const update = getBinaryNodeChild(node, 'collection')
if(update) {
const name = update.attrs.name as WAPatchName
await resyncAppState([name], undefined)
await resyncAppState([name], false)
}
break
@@ -529,7 +529,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
// send ack for history message
const isAnyHistoryMsg = isHistoryMsg(msg.message!)
const isAnyHistoryMsg = getHistoryMsg(msg.message!)
if(isAnyHistoryMsg) {
const jid = jidNormalizedUser(msg.key.remoteJid!)
await sendReceipt(jid, undefined, [msg.key.id!], 'hist_sync')
@@ -618,6 +618,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
await ev.flush()
ev.emit('connection.update', { receivedPendingNotifications: true })
})

View File

@@ -535,7 +535,7 @@ export const makeSocket = ({
const name = update.me?.name
// if name has just been received
if(creds.me?.name !== name) {
logger.info({ name }, 'updated pushName')
logger.debug({ name }, 'updated pushName')
sendNode({
tag: 'presence',
attrs: { name: name! }