refactor: use event-buffer for state sync resyncs

This commit is contained in:
Adhiraj Singh
2022-06-27 14:51:26 +05:30
parent 7421f55daa
commit eaf0f37d24
5 changed files with 68 additions and 130 deletions

View File

@@ -1,7 +1,7 @@
import { Boom } from '@hapi/boom' import { Boom } from '@hapi/boom'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils' import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex' import { makeMutex } from '../Utils/make-mutex'
import processMessage from '../Utils/process-message' import processMessage from '../Utils/process-message'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
@@ -273,14 +273,12 @@ export const makeChatsSocket = (config: SocketConfig) => {
}) })
} }
const newAppStateChunkHandler = (collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { const newAppStateChunkHandler = (recvChats: InitialReceivedChatsState | undefined) => {
const appStateChunk = newAppStateChunk(collections)
return { return {
appStateChunk,
onMutation(mutation: ChatMutation) { onMutation(mutation: ChatMutation) {
processSyncAction( processSyncAction(
mutation, mutation,
appStateChunk.updates, ev,
authState.creds.me, authState.creds.me,
recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined, recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined,
logger logger
@@ -290,7 +288,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => { const resyncAppState = async(collections: readonly WAPatchName[], recvChats: InitialReceivedChatsState | undefined) => {
const { appStateChunk, onMutation } = newAppStateChunkHandler(collections, recvChats) const startedBuffer = ev.buffer()
const { onMutation } = newAppStateChunkHandler(recvChats)
// we use this to determine which events to fire // we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire // otherwise when we resync from scratch -- all notifications will fire
const initialVersionMap: { [T in WAPatchName]?: number } = { } const initialVersionMap: { [T in WAPatchName]?: number } = { }
@@ -402,9 +401,10 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
) )
processSyncActionsLocal(appStateChunk.updates) // flush everything if we started the buffer here
if(startedBuffer) {
return appStateChunk await ev.flush()
}
} }
/** /**
@@ -517,14 +517,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
) )
} }
const processSyncActionsLocal = (actions: SyncActionUpdates) => {
emitEventsFromMap(syncActionUpdatesToEventMap(actions))
// resend available presence to update name on servers
if(actions.credsUpdates.me?.name && markOnlineOnConnect) {
sendPresenceUpdate('available')
}
}
const appPatch = async(patchCreate: WAPatchCreate) => { const appPatch = async(patchCreate: WAPatchCreate) => {
const name = patchCreate.type const name = patchCreate.type
const myAppStateKeyId = authState.creds.myAppStateKeyId const myAppStateKeyId = authState.creds.myAppStateKeyId
@@ -594,7 +586,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
) )
if(config.emitOwnEvents) { if(config.emitOwnEvents) {
const { appStateChunk, onMutation } = newAppStateChunkHandler([name], undefined) const { onMutation } = newAppStateChunkHandler(undefined)
await decodePatches( await decodePatches(
name, name,
[{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }],
@@ -604,7 +596,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
undefined, undefined,
logger, logger,
) )
processSyncActionsLocal(appStateChunk.updates)
} }
} }

View File

@@ -529,7 +529,7 @@ export const makeSocket = ({
ev.on('creds.update', update => { ev.on('creds.update', update => {
const name = update.me?.name const name = update.me?.name
// if name has just been received // if name has just been received
if(!creds.me?.name && name) { if(creds.me?.name !== name) {
logger.info({ name }, 'updated pushName') logger.info({ name }, 'updated pushName')
sendNode({ sendNode({
tag: 'presence', tag: 'presence',

View File

@@ -1,7 +1,6 @@
import type { proto } from '../../WAProto' import type { proto } from '../../WAProto'
import type { AccountSettings, AuthenticationCreds } from './Auth' import type { AccountSettings } from './Auth'
import { Contact } from './Contact' import type { MinimalMessage } from './Message'
import type { MinimalMessage, WAMessageUpdate } from './Message'
/** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */ /** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */
export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused' export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused'
@@ -26,20 +25,6 @@ export type ChatMutation = {
index: string[] index: string[]
} }
export type SyncActionUpdates = {
credsUpdates: Partial<AuthenticationCreds>
chatUpdates: { [jid: string]: Partial<Chat> }
chatDeletes: string[]
contactUpserts: { [jid: string]: Contact }
msgUpdates: { [jid: string]: WAMessageUpdate }
msgDeletes: proto.IMessageKey[]
}
export type AppStateChunk = {
updates: SyncActionUpdates
collectionsToHandle: WAPatchName[]
}
export type WAPatchCreate = { export type WAPatchCreate = {
syncAction: proto.ISyncActionValue syncAction: proto.ISyncActionValue
index: string[] index: string[]

View File

@@ -1,7 +1,7 @@
import { Boom } from '@hapi/boom' import { Boom } from '@hapi/boom'
import type { Logger } from 'pino' import type { Logger } from 'pino'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { AppStateChunk, AuthenticationCreds, BaileysEventMap, Chat, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, SyncActionUpdates, WAPatchCreate, WAPatchName } from '../Types' import { BaileysEventEmitter, ChatModification, ChatMutation, Contact, InitialAppStateSyncOptions, LastMessageList, LTHashState, WAPatchCreate, WAPatchName } from '../Types'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, jidNormalizedUser } from '../WABinary' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, jidNormalizedUser } from '../WABinary'
import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto' import { aesDecrypt, aesEncrypt, hkdf, hmacSign } from './crypto'
import { toNumber } from './generics' import { toNumber } from './generics'
@@ -115,18 +115,6 @@ const generatePatchMac = (snapshotMac: Uint8Array, valueMacs: Uint8Array[], vers
export const newLTHashState = (): LTHashState => ({ version: 0, hash: Buffer.alloc(128), indexValueMap: {} }) export const newLTHashState = (): LTHashState => ({ version: 0, hash: Buffer.alloc(128), indexValueMap: {} })
export const newAppStateChunk = (collectionsToHandle: readonly WAPatchName[]): AppStateChunk => ({
updates: {
chatUpdates: { },
credsUpdates: { },
chatDeletes: [],
contactUpserts: { },
msgDeletes: [],
msgUpdates: { }
},
collectionsToHandle: [...collectionsToHandle],
})
export const encodeSyncdPatch = async( export const encodeSyncdPatch = async(
{ type, index, syncAction, apiVersion, operation }: WAPatchCreate, { type, index, syncAction, apiVersion, operation }: WAPatchCreate,
myAppStateKeyId: string, myAppStateKeyId: string,
@@ -602,7 +590,7 @@ export const chatModificationToAppPatch = (
export const processSyncAction = ( export const processSyncAction = (
syncAction: ChatMutation, syncAction: ChatMutation,
{ credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates, ev: BaileysEventEmitter,
me: Contact, me: Contact,
initialSyncOpts?: InitialAppStateSyncOptions, initialSyncOpts?: InitialAppStateSyncOptions,
logger?: Logger, logger?: Logger,
@@ -612,11 +600,18 @@ export const processSyncAction = (
const accountSettings = initialSyncOpts?.accountSettings const accountSettings = initialSyncOpts?.accountSettings
const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction const { syncAction: { value: action }, index: [_, id, msgId, fromMe] } = syncAction
const update: Partial<Chat> = { id }
if(action?.muteAction) { if(action?.muteAction) {
update.mute = action.muteAction?.muted ? ev.emit(
toNumber(action.muteAction!.muteEndTimestamp!) : 'chats.update',
undefined [
{
id,
mute: action.muteAction?.muted ?
toNumber(action.muteAction!.muteEndTimestamp!) :
undefined
}
]
)
} else if(action?.archiveChatAction) { } else if(action?.archiveChatAction) {
// okay so we've to do some annoying computation here // okay so we've to do some annoying computation here
// when we're initially syncing the app state // when we're initially syncing the app state
@@ -637,9 +632,9 @@ export const processSyncAction = (
// basically we don't need to fire an "archive" update if the chat is being marked unarchvied // basically we don't need to fire an "archive" update if the chat is being marked unarchvied
// this only applies for the initial sync // this only applies for the initial sync
if(isInitialSync && !archiveAction.archived) { if(isInitialSync && !archiveAction.archived) {
delete update.archive ev.emit('chats.update', [{ id, archive: false }])
} else { } else {
update.archive = !!archiveAction?.archived ev.emit('chats.update', [{ id, archive: !!archiveAction?.archived }])
} }
} }
} else if(action?.markChatAsReadAction) { } else if(action?.markChatAsReadAction) {
@@ -652,97 +647,50 @@ export const processSyncAction = (
// because the chat is read by default // because the chat is read by default
// this only applies for the initial sync // this only applies for the initial sync
if(isInitialSync && markReadAction.read) { if(isInitialSync && markReadAction.read) {
delete update.unreadCount ev.emit('chats.update', [{ id, unreadCount: null }])
} else { } else {
update.unreadCount = !!markReadAction?.read ? 0 : -1 ev.emit('chats.update', [{ id, unreadCount: !!markReadAction?.read ? 0 : -1 }])
} }
} }
} else if(action?.clearChatAction) { } else if(action?.clearChatAction) {
msgDeletes.push({ ev.emit('messages.delete', { keys: [
remoteJid: id, {
id: msgId, remoteJid: id,
fromMe: fromMe === '1' id: msgId,
}) fromMe: fromMe === '1'
}
] })
} else if(action?.contactAction) { } else if(action?.contactAction) {
contactUpserts[id] = { ev.emit('contacts.upsert', [{ id, name: action.contactAction!.fullName }])
...(contactUpserts[id] || {}),
id,
name: action.contactAction!.fullName
}
} else if(action?.pushNameSetting) { } else if(action?.pushNameSetting) {
if(me?.name !== action?.pushNameSetting) { if(me?.name !== action?.pushNameSetting) {
credsUpdates.me = { ...me, name: action?.pushNameSetting?.name! } ev.emit('creds.update', { me: { ...me, name: action?.pushNameSetting?.name! } })
} }
} else if(action?.pinAction) { } else if(action?.pinAction) {
update.pin = action.pinAction?.pinned ? toNumber(action.timestamp) : null ev.emit('chats.update', [{ id, pin: action.pinAction?.pinned ? toNumber(action.timestamp) : null }])
} else if(action?.unarchiveChatsSetting) { } else if(action?.unarchiveChatsSetting) {
const unarchiveChats = !!action.unarchiveChatsSetting.unarchiveChats const unarchiveChats = !!action.unarchiveChatsSetting.unarchiveChats
credsUpdates.accountSettings = { unarchiveChats } ev.emit('creds.update', { accountSettings: { unarchiveChats } })
logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`) logger.info(`archive setting updated => '${action.unarchiveChatsSetting.unarchiveChats}'`)
accountSettings.unarchiveChats = unarchiveChats accountSettings.unarchiveChats = unarchiveChats
} else if(action?.starAction) { } else if(action?.starAction) {
const uqId = `${id},${msgId}` ev.emit('messages.update', [
const update = msgUpdates[uqId] || { {
key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' }, key: { remoteJid: id, id: msgId, fromMe: fromMe === '1' },
update: { } update: { starred: !!action.starAction?.starred }
} }
])
update.update.starred = !!action.starAction?.starred
msgUpdates[uqId] = update
} else if(action?.deleteChatAction) { } else if(action?.deleteChatAction) {
chatDeletes.push(id) ev.emit('chats.delete', [id])
} else { } else {
logger.warn({ syncAction, id }, 'unprocessable update') logger.warn({ syncAction, id }, 'unprocessable update')
} }
if(Object.keys(update).length > 1) {
chatUpdates[update.id] = {
...(chatUpdates[update.id] || {}),
...update
}
} else if(chatUpdates[update.id]) {
// remove if the update got cancelled
logger?.debug({ id: update.id }, 'cancelling update')
delete chatUpdates[update.id]
}
function isValidPatchBasedOnMessageRange(id: string, msgRange: proto.ISyncActionMessageRange) { function isValidPatchBasedOnMessageRange(id: string, msgRange: proto.ISyncActionMessageRange) {
const chat = recvChats?.[id] const chat = recvChats?.[id]
const lastMsgTimestamp = msgRange.lastMessageTimestamp || msgRange.lastSystemMessageTimestamp || 0 const lastMsgTimestamp = msgRange.lastMessageTimestamp || msgRange.lastSystemMessageTimestamp || 0
const chatLastMsgTimestamp = chat?.lastMsgRecvTimestamp || 0 const chatLastMsgTimestamp = chat?.lastMsgRecvTimestamp || 0
return lastMsgTimestamp >= chatLastMsgTimestamp return lastMsgTimestamp >= chatLastMsgTimestamp
} }
} }
export const syncActionUpdatesToEventMap = (
{ credsUpdates, chatUpdates, chatDeletes, contactUpserts, msgDeletes, msgUpdates }: SyncActionUpdates,
) => {
const map: Partial<BaileysEventMap<AuthenticationCreds>> = { }
if(Object.keys(credsUpdates).length) {
map['creds.update'] = credsUpdates
}
if(Object.values(chatUpdates).length) {
map['chats.update'] = Object.values(chatUpdates)
}
if(chatDeletes.length) {
map['chats.delete'] = chatDeletes
}
if(Object.values(contactUpserts).length) {
map['contacts.upsert'] = Object.values(contactUpserts)
}
if(msgDeletes.length) {
map['messages.delete'] = { keys: msgDeletes }
}
if(Object.keys(msgUpdates).length) {
map['messages.update'] = Object.values(msgUpdates)
}
return map
}

View File

@@ -23,8 +23,11 @@ type BufferableEvent = typeof BUFFERABLE_EVENT[number]
const BUFFERABLE_EVENT_SET = new Set<BaileysEvent>(BUFFERABLE_EVENT) const BUFFERABLE_EVENT_SET = new Set<BaileysEvent>(BUFFERABLE_EVENT)
type BaileysBufferableEventEmitter = BaileysEventEmitter & { type BaileysBufferableEventEmitter = BaileysEventEmitter & {
/** starts buffering events, call flush() to release them */ /**
buffer(): void * starts buffering events, call flush() to release them
* @returns true if buffering just started, false if it was already buffering
* */
buffer(): boolean
/** flushes all buffered events */ /** flushes all buffered events */
flush(): Promise<void> flush(): Promise<void>
/** waits for the task to complete, before releasing the buffer */ /** waits for the task to complete, before releasing the buffer */
@@ -61,8 +64,11 @@ export const makeEventBuffer = (
} }
}, },
buffer() { buffer() {
logger.trace('buffering events') if(!isBuffering) {
isBuffering = true logger.trace('buffering events')
isBuffering = true
return true
}
}, },
async flush() { async flush() {
if(!isBuffering) { if(!isBuffering) {
@@ -349,6 +355,14 @@ function flush(data: BufferedEventData, ev: BaileysEventEmitter) {
} }
function concatChats<C extends Partial<Chat>>(a: C, b: C) { function concatChats<C extends Partial<Chat>>(a: C, b: C) {
if(b.unreadCount === null) {
// neutralize unread counter
if(a.unreadCount < 0) {
a.unreadCount = undefined
b.unreadCount = undefined
}
}
if(typeof a.unreadCount !== 'undefined' && typeof b.unreadCount !== 'undefined') { if(typeof a.unreadCount !== 'undefined' && typeof b.unreadCount !== 'undefined') {
b = { ...b } b = { ...b }
if(b.unreadCount >= 0) { if(b.unreadCount >= 0) {