mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
feat: more accurately handle app state sync
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
import { Boom } from '@hapi/boom'
|
import { Boom } from '@hapi/boom'
|
||||||
import { proto } from '../../WAProto'
|
import { proto } from '../../WAProto'
|
||||||
import { AppStateChunk, ChatModification, ChatMutation, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
|
import { ALL_WA_PATCH_NAMES, AppStateChunk, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
|
||||||
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils'
|
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newLTHashState, processSyncActions } from '../Utils'
|
||||||
import { makeMutex } from '../Utils/make-mutex'
|
import { makeMutex } from '../Utils/make-mutex'
|
||||||
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
|
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
|
||||||
@@ -228,7 +228,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const resyncAppState = async(collections: WAPatchName[]) => {
|
const resyncAppState = async(collections: readonly WAPatchName[], ctx: InitialReceivedChatsState | undefined) => {
|
||||||
const appStateChunk: AppStateChunk = { totalMutations: [], collectionsToHandle: [] }
|
const appStateChunk: AppStateChunk = { totalMutations: [], collectionsToHandle: [] }
|
||||||
// we use this to determine which events to fire
|
// we use this to determine which events to fire
|
||||||
// otherwise when we resync from scratch -- all notifications will fire
|
// otherwise when we resync from scratch -- all notifications will fire
|
||||||
@@ -344,7 +344,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
processSyncActionsLocal(appStateChunk.totalMutations)
|
processSyncActionsLocal(appStateChunk.totalMutations, ctx)
|
||||||
|
|
||||||
return appStateChunk
|
return appStateChunk
|
||||||
}
|
}
|
||||||
@@ -446,18 +446,12 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const resyncMainAppState = async() => {
|
const resyncMainAppState = async(ctx?: InitialReceivedChatsState) => {
|
||||||
logger.debug('resyncing main app state')
|
logger.debug('resyncing main app state')
|
||||||
|
|
||||||
await (
|
await (
|
||||||
mutationMutex.mutex(
|
mutationMutex.mutex(
|
||||||
() => resyncAppState([
|
() => resyncAppState(ALL_WA_PATCH_NAMES, ctx)
|
||||||
'critical_block',
|
|
||||||
'critical_unblock_low',
|
|
||||||
'regular_high',
|
|
||||||
'regular_low',
|
|
||||||
'regular'
|
|
||||||
])
|
|
||||||
)
|
)
|
||||||
.catch(err => (
|
.catch(err => (
|
||||||
onUnexpectedError(err, 'main app sync')
|
onUnexpectedError(err, 'main app sync')
|
||||||
@@ -465,8 +459,13 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
const processSyncActionsLocal = (actions: ChatMutation[]) => {
|
const processSyncActionsLocal = (actions: ChatMutation[], recvChats: InitialReceivedChatsState | undefined) => {
|
||||||
const events = processSyncActions(actions, authState.creds.me!, logger)
|
const events = processSyncActions(
|
||||||
|
actions,
|
||||||
|
authState.creds.me!,
|
||||||
|
recvChats ? { recvChats, accountSettings: authState.creds.accountSettings } : undefined,
|
||||||
|
logger
|
||||||
|
)
|
||||||
emitEventsFromMap(events)
|
emitEventsFromMap(events)
|
||||||
// resend available presence to update name on servers
|
// resend available presence to update name on servers
|
||||||
if(events['creds.update']?.me?.name && markOnlineOnConnect) {
|
if(events['creds.update']?.me?.name && markOnlineOnConnect) {
|
||||||
@@ -490,7 +489,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
async() => {
|
async() => {
|
||||||
logger.debug({ patch: patchCreate }, 'applying app patch')
|
logger.debug({ patch: patchCreate }, 'applying app patch')
|
||||||
|
|
||||||
await resyncAppState([name])
|
await resyncAppState([name], undefined)
|
||||||
|
|
||||||
const { [name]: currentSyncVersion } = await authState.keys.get('app-state-sync-version', [name])
|
const { [name]: currentSyncVersion } = await authState.keys.get('app-state-sync-version', [name])
|
||||||
initial = currentSyncVersion || newLTHashState()
|
initial = currentSyncVersion || newLTHashState()
|
||||||
@@ -544,7 +543,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
|
|
||||||
if(config.emitOwnEvents) {
|
if(config.emitOwnEvents) {
|
||||||
const result = await decodePatches(name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], initial, getAppStateSyncKey)
|
const result = await decodePatches(name, [{ ...encodeResult.patch, version: { version: encodeResult.state.version }, }], initial, getAppStateSyncKey)
|
||||||
processSyncActionsLocal(result.newMutations)
|
processSyncActionsLocal(result.newMutations, undefined)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -654,7 +653,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
if(update) {
|
if(update) {
|
||||||
const name = update.attrs.name as WAPatchName
|
const name = update.attrs.name as WAPatchName
|
||||||
mutationMutex.mutex(() => (
|
mutationMutex.mutex(() => (
|
||||||
resyncAppState([name])
|
resyncAppState([name], undefined)
|
||||||
.catch(err => logger.error({ trace: err.stack, node }, 'failed to sync state'))
|
.catch(err => logger.error({ trace: err.stack, node }, 'failed to sync state'))
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
|
|
||||||
import { proto } from '../../WAProto'
|
import { proto } from '../../WAProto'
|
||||||
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
|
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
|
||||||
import { BaileysEventMap, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types'
|
import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageStubType } from '../Types'
|
||||||
import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
|
import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
|
||||||
import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex'
|
import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex'
|
||||||
import processMessage, { cleanMessage } from '../Utils/process-message'
|
import processMessage, { cleanMessage } from '../Utils/process-message'
|
||||||
@@ -37,16 +37,28 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
|||||||
/** this mutex ensures that each retryRequest will wait for the previous one to finish */
|
/** this mutex ensures that each retryRequest will wait for the previous one to finish */
|
||||||
const retryMutex = makeMutex()
|
const retryMutex = makeMutex()
|
||||||
|
|
||||||
|
const historyCache = new Set<string>()
|
||||||
|
let recvChats: InitialReceivedChatsState = { }
|
||||||
|
|
||||||
const appStateSyncTimeout = debouncedTimeout(
|
const appStateSyncTimeout = debouncedTimeout(
|
||||||
6_000,
|
6_000,
|
||||||
() => ws.readyState === ws.OPEN && resyncMainAppState()
|
async() => {
|
||||||
|
logger.info(
|
||||||
|
{ recvChats: Object.keys(recvChats).length },
|
||||||
|
'doing initial app state sync'
|
||||||
|
)
|
||||||
|
if(ws.readyState === ws.OPEN) {
|
||||||
|
await resyncMainAppState(recvChats)
|
||||||
|
}
|
||||||
|
|
||||||
|
historyCache.clear()
|
||||||
|
recvChats = { }
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
const msgRetryMap = config.msgRetryCounterMap || { }
|
const msgRetryMap = config.msgRetryCounterMap || { }
|
||||||
const callOfferData: { [id: string]: WACallEvent } = { }
|
const callOfferData: { [id: string]: WACallEvent } = { }
|
||||||
|
|
||||||
const historyCache = new Set<string>()
|
|
||||||
|
|
||||||
let sendActiveReceipts = false
|
let sendActiveReceipts = false
|
||||||
|
|
||||||
const sendMessageAck = async({ tag, attrs }: BinaryNode, extraAttrs: BinaryNodeAttributes = { }) => {
|
const sendMessageAck = async({ tag, attrs }: BinaryNode, extraAttrs: BinaryNodeAttributes = { }) => {
|
||||||
@@ -160,6 +172,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
|||||||
{
|
{
|
||||||
downloadHistory,
|
downloadHistory,
|
||||||
historyCache,
|
historyCache,
|
||||||
|
recvChats,
|
||||||
creds: authState.creds,
|
creds: authState.creds,
|
||||||
keyStore: authState.keys,
|
keyStore: authState.keys,
|
||||||
logger,
|
logger,
|
||||||
|
|||||||
212
src/Tests/test.app-state-sync.ts
Normal file
212
src/Tests/test.app-state-sync.ts
Normal file
@@ -0,0 +1,212 @@
|
|||||||
|
import { AccountSettings, ChatMutation, Contact, InitialAppStateSyncOptions } from '../Types'
|
||||||
|
import { unixTimestampSeconds } from '../Utils'
|
||||||
|
import { processSyncActions } from '../Utils/chat-utils'
|
||||||
|
import logger from '../Utils/logger'
|
||||||
|
import { jidEncode } from '../WABinary'
|
||||||
|
|
||||||
|
describe('App State Sync Tests', () => {
|
||||||
|
|
||||||
|
const me: Contact = { id: randomJid() }
|
||||||
|
// case when initial sync is off
|
||||||
|
it('should return archive=false event', () => {
|
||||||
|
const jid = randomJid()
|
||||||
|
const index = ['archive', jid]
|
||||||
|
|
||||||
|
const CASES: ChatMutation[][] = [
|
||||||
|
[
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: false,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: unixTimestampSeconds()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: true,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: unixTimestampSeconds()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: false,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: unixTimestampSeconds()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
]
|
||||||
|
|
||||||
|
for(const mutations of CASES) {
|
||||||
|
const events = processSyncActions(mutations, me, undefined, logger)
|
||||||
|
expect(events['chats.update']).toHaveLength(1)
|
||||||
|
const event = events['chats.update']?.[0]
|
||||||
|
expect(event.archive).toEqual(false)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
// case when initial sync is on
|
||||||
|
// and unarchiveChats = true
|
||||||
|
it('should not fire any archive event', () => {
|
||||||
|
const jid = randomJid()
|
||||||
|
const index = ['archive', jid]
|
||||||
|
const now = unixTimestampSeconds()
|
||||||
|
|
||||||
|
const CASES: ChatMutation[][] = [
|
||||||
|
[
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: true,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: false,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now + 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: true,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now + 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: false,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now + 11
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
]
|
||||||
|
|
||||||
|
const ctx: InitialAppStateSyncOptions = {
|
||||||
|
recvChats: {
|
||||||
|
[jid]: { lastMsgRecvTimestamp: now }
|
||||||
|
},
|
||||||
|
accountSettings: { unarchiveChats: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
for(const mutations of CASES) {
|
||||||
|
const events = processSyncActions(mutations, me, ctx, logger)
|
||||||
|
expect(events['chats.update']?.length).toBeFalsy()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// case when initial sync is on
|
||||||
|
// with unarchiveChats = true & unarchiveChats = false
|
||||||
|
it('should fire archive=true events', () => {
|
||||||
|
const jid = randomJid()
|
||||||
|
const index = ['archive', jid]
|
||||||
|
const now = unixTimestampSeconds()
|
||||||
|
|
||||||
|
const CASES: { settings: AccountSettings, mutations: ChatMutation[] }[] = [
|
||||||
|
{
|
||||||
|
settings: { unarchiveChats: true },
|
||||||
|
mutations: [
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: true,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
settings: { unarchiveChats: false },
|
||||||
|
mutations: [
|
||||||
|
{
|
||||||
|
index,
|
||||||
|
syncAction: {
|
||||||
|
value: {
|
||||||
|
archiveChatAction: {
|
||||||
|
archived: true,
|
||||||
|
messageRange: {
|
||||||
|
lastMessageTimestamp: now - 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
for(const { mutations, settings } of CASES) {
|
||||||
|
const ctx: InitialAppStateSyncOptions = {
|
||||||
|
recvChats: {
|
||||||
|
[jid]: { lastMsgRecvTimestamp: now }
|
||||||
|
},
|
||||||
|
accountSettings: settings
|
||||||
|
}
|
||||||
|
const events = processSyncActions(mutations, me, ctx, logger)
|
||||||
|
expect(events['chats.update']).toHaveLength(1)
|
||||||
|
const event = events['chats.update']?.[0]
|
||||||
|
expect(event.archive).toEqual(true)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
function randomJid() {
|
||||||
|
return jidEncode(Math.floor(Math.random() * 1000000), Math.random() < 0.5 ? 's.whatsapp.net' : 'g.us')
|
||||||
|
}
|
||||||
@@ -1,10 +1,19 @@
|
|||||||
import type { proto } from '../../WAProto'
|
import type { proto } from '../../WAProto'
|
||||||
|
import type { AccountSettings } from './Auth'
|
||||||
import type { MinimalMessage } from './Message'
|
import type { MinimalMessage } from './Message'
|
||||||
|
|
||||||
/** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */
|
/** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */
|
||||||
export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused'
|
export type WAPresence = 'unavailable' | 'available' | 'composing' | 'recording' | 'paused'
|
||||||
|
|
||||||
export type WAPatchName = 'critical_block' | 'critical_unblock_low' | 'regular_low' | 'regular_high' | 'regular'
|
export const ALL_WA_PATCH_NAMES = [
|
||||||
|
'critical_block',
|
||||||
|
'critical_unblock_low',
|
||||||
|
'regular_high',
|
||||||
|
'regular_low',
|
||||||
|
'regular'
|
||||||
|
] as const
|
||||||
|
|
||||||
|
export type WAPatchName = typeof ALL_WA_PATCH_NAMES[number]
|
||||||
|
|
||||||
export interface PresenceData {
|
export interface PresenceData {
|
||||||
lastKnownPresence: WAPresence
|
lastKnownPresence: WAPresence
|
||||||
@@ -65,4 +74,14 @@ export type ChatModification =
|
|||||||
markRead: boolean
|
markRead: boolean
|
||||||
lastMessages: LastMessageList
|
lastMessages: LastMessageList
|
||||||
} |
|
} |
|
||||||
{ delete: true, lastMessages: LastMessageList }
|
{ delete: true, lastMessages: LastMessageList }
|
||||||
|
|
||||||
|
|
||||||
|
export type InitialReceivedChatsState = {
|
||||||
|
[jid: string]: { lastMsgRecvTimestamp: number }
|
||||||
|
}
|
||||||
|
|
||||||
|
export type InitialAppStateSyncOptions = {
|
||||||
|
recvChats: InitialReceivedChatsState
|
||||||
|
accountSettings: AccountSettings
|
||||||
|
}
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ export const encodeBigEndian = (e: number, t = 4) => {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
export const toNumber = (t: Long | number) => ((typeof t === 'object' && t) ? ('toNumber' in t ? t.toNumber() : (t as any).low) : t)
|
export const toNumber = (t: Long | number): number => ((typeof t === 'object' && t) ? ('toNumber' in t ? t.toNumber() : (t as any).low) : t)
|
||||||
|
|
||||||
export function shallowChanges <T>(old: T, current: T, { lookForDeletedKeys }: {lookForDeletedKeys: boolean}): Partial<T> {
|
export function shallowChanges <T>(old: T, current: T, { lookForDeletedKeys }: {lookForDeletedKeys: boolean}): Partial<T> {
|
||||||
const changes: Partial<T> = {}
|
const changes: Partial<T> = {}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import { promisify } from 'util'
|
import { promisify } from 'util'
|
||||||
import { inflate } from 'zlib'
|
import { inflate } from 'zlib'
|
||||||
import { proto } from '../../WAProto'
|
import { proto } from '../../WAProto'
|
||||||
import { Chat, Contact } from '../Types'
|
import { Chat, Contact, InitialReceivedChatsState } from '../Types'
|
||||||
import { isJidUser } from '../WABinary'
|
import { isJidUser } from '../WABinary'
|
||||||
|
import { toNumber } from './generics'
|
||||||
import { downloadContentFromMessage } from './messages-media'
|
import { downloadContentFromMessage } from './messages-media'
|
||||||
|
|
||||||
const inflatePromise = promisify(inflate)
|
const inflatePromise = promisify(inflate)
|
||||||
@@ -21,7 +22,11 @@ export const downloadHistory = async(msg: proto.IHistorySyncNotification) => {
|
|||||||
return syncData
|
return syncData
|
||||||
}
|
}
|
||||||
|
|
||||||
export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Set<string>) => {
|
export const processHistoryMessage = (
|
||||||
|
item: proto.IHistorySync,
|
||||||
|
historyCache: Set<string>,
|
||||||
|
recvChats: InitialReceivedChatsState
|
||||||
|
) => {
|
||||||
const messages: proto.IWebMessageInfo[] = []
|
const messages: proto.IWebMessageInfo[] = []
|
||||||
const contacts: Contact[] = []
|
const contacts: Contact[] = []
|
||||||
const chats: Chat[] = []
|
const chats: Chat[] = []
|
||||||
@@ -40,6 +45,13 @@ export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Se
|
|||||||
const uqId = `${message.key.remoteJid}:${message.key.id}`
|
const uqId = `${message.key.remoteJid}:${message.key.id}`
|
||||||
if(!historyCache.has(uqId)) {
|
if(!historyCache.has(uqId)) {
|
||||||
messages.push(message)
|
messages.push(message)
|
||||||
|
|
||||||
|
const curItem = recvChats[message.key.remoteJid]
|
||||||
|
const timestamp = toNumber(message.messageTimestamp)
|
||||||
|
if(!message.key.fromMe && (!curItem || timestamp > curItem.lastMsgRecvTimestamp)) {
|
||||||
|
recvChats[message.key.remoteJid] = { lastMsgRecvTimestamp: timestamp }
|
||||||
|
}
|
||||||
|
|
||||||
historyCache.add(uqId)
|
historyCache.add(uqId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -81,7 +93,11 @@ export const processHistoryMessage = (item: proto.IHistorySync, historyCache: Se
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const downloadAndProcessHistorySyncNotification = async(msg: proto.IHistorySyncNotification, historyCache: Set<string>) => {
|
export const downloadAndProcessHistorySyncNotification = async(
|
||||||
|
msg: proto.IHistorySyncNotification,
|
||||||
|
historyCache: Set<string>,
|
||||||
|
recvChats: InitialReceivedChatsState
|
||||||
|
) => {
|
||||||
const historyMsg = await downloadHistory(msg)
|
const historyMsg = await downloadHistory(msg)
|
||||||
return processHistoryMessage(historyMsg, historyCache)
|
return processHistoryMessage(historyMsg, historyCache, recvChats)
|
||||||
}
|
}
|
||||||
@@ -1,11 +1,12 @@
|
|||||||
import type { Logger } from 'pino'
|
import type { Logger } from 'pino'
|
||||||
import { proto } from '../../WAProto'
|
import { proto } from '../../WAProto'
|
||||||
import { AuthenticationCreds, BaileysEventMap, Chat, GroupMetadata, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types'
|
import { AuthenticationCreds, BaileysEventMap, Chat, GroupMetadata, InitialReceivedChatsState, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types'
|
||||||
import { downloadAndProcessHistorySyncNotification, normalizeMessageContent, toNumber } from '../Utils'
|
import { downloadAndProcessHistorySyncNotification, normalizeMessageContent, toNumber } from '../Utils'
|
||||||
import { areJidsSameUser, jidNormalizedUser } from '../WABinary'
|
import { areJidsSameUser, jidNormalizedUser } from '../WABinary'
|
||||||
|
|
||||||
type ProcessMessageContext = {
|
type ProcessMessageContext = {
|
||||||
historyCache: Set<string>
|
historyCache: Set<string>
|
||||||
|
recvChats: InitialReceivedChatsState
|
||||||
downloadHistory: boolean
|
downloadHistory: boolean
|
||||||
creds: AuthenticationCreds
|
creds: AuthenticationCreds
|
||||||
keyStore: SignalKeyStoreWithTransaction
|
keyStore: SignalKeyStoreWithTransaction
|
||||||
@@ -38,7 +39,7 @@ export const cleanMessage = (message: proto.IWebMessageInfo, meId: string) => {
|
|||||||
|
|
||||||
const processMessage = async(
|
const processMessage = async(
|
||||||
message: proto.IWebMessageInfo,
|
message: proto.IWebMessageInfo,
|
||||||
{ downloadHistory, historyCache, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext
|
{ downloadHistory, historyCache, recvChats, creds, keyStore, logger, treatCiphertextMessagesAsReal }: ProcessMessageContext
|
||||||
) => {
|
) => {
|
||||||
const meId = creds.me!.id
|
const meId = creds.me!.id
|
||||||
const { accountSettings } = creds
|
const { accountSettings } = creds
|
||||||
@@ -78,7 +79,7 @@ const processMessage = async(
|
|||||||
logger?.info({ histNotification, id: message.key.id }, 'got history notification')
|
logger?.info({ histNotification, id: message.key.id }, 'got history notification')
|
||||||
|
|
||||||
if(downloadHistory) {
|
if(downloadHistory) {
|
||||||
const { chats, contacts, messages, didProcess } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache)
|
const { chats, contacts, messages, didProcess } = await downloadAndProcessHistorySyncNotification(histNotification, historyCache, recvChats)
|
||||||
const isLatest = historyCache.size === 0 && !creds.processedHistoryMessages?.length
|
const isLatest = historyCache.size === 0 && !creds.processedHistoryMessages?.length
|
||||||
|
|
||||||
if(chats.length) {
|
if(chats.length) {
|
||||||
|
|||||||
Reference in New Issue
Block a user