mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
refactor: cleaner + faster app state sync
This commit is contained in:
@@ -6,6 +6,8 @@ import { makeMessagesSocket } from "./messages-send";
|
|||||||
import makeMutex from "../Utils/make-mutex";
|
import makeMutex from "../Utils/make-mutex";
|
||||||
import { Boom } from "@hapi/boom";
|
import { Boom } from "@hapi/boom";
|
||||||
|
|
||||||
|
const MAX_SYNC_ATTEMPTS = 5
|
||||||
|
|
||||||
export const makeChatsSocket = (config: SocketConfig) => {
|
export const makeChatsSocket = (config: SocketConfig) => {
|
||||||
const { logger } = config
|
const { logger } = config
|
||||||
const sock = makeMessagesSocket(config)
|
const sock = makeMessagesSocket(config)
|
||||||
@@ -177,19 +179,21 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => {
|
const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false) => {
|
||||||
if(fromScratch) returnSnapshot = true
|
|
||||||
|
|
||||||
const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []}
|
const appStateChunk : AppStateChunk = {totalMutations: [], collectionsToHandle: []}
|
||||||
|
|
||||||
await authState.keys.transaction(
|
await authState.keys.transaction(
|
||||||
async() => {
|
async() => {
|
||||||
const collectionsToHandle = new Set<string>(collections)
|
const collectionsToHandle = new Set<string>(collections)
|
||||||
|
// in case something goes wrong -- ensure we don't enter a loop that cannot be exited from
|
||||||
|
const attemptsMap = { } as { [T in WAPatchName]: number | undefined }
|
||||||
// keep executing till all collections are done
|
// keep executing till all collections are done
|
||||||
// sometimes a single patch request will not return all the patches (God knows why)
|
// sometimes a single patch request will not return all the patches (God knows why)
|
||||||
// so we fetch till they're all done (this is determined by the "has_more_patches" flag)
|
// so we fetch till they're all done (this is determined by the "has_more_patches" flag)
|
||||||
while(collectionsToHandle.size) {
|
while(collectionsToHandle.size) {
|
||||||
const states = { } as { [T in WAPatchName]: LTHashState }
|
const states = { } as { [T in WAPatchName]: LTHashState }
|
||||||
|
const nodes: BinaryNode[] = []
|
||||||
|
|
||||||
for(const name of collectionsToHandle) {
|
for(const name of collectionsToHandle) {
|
||||||
let state: LTHashState
|
let state: LTHashState
|
||||||
if(!fromScratch) {
|
if(!fromScratch) {
|
||||||
@@ -201,7 +205,18 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
states[name] = state
|
states[name] = state
|
||||||
|
|
||||||
logger.info(`resyncing ${name} from v${state.version}`)
|
logger.info(`resyncing ${name} from v${state.version}`)
|
||||||
|
|
||||||
|
nodes.push({
|
||||||
|
tag: 'collection',
|
||||||
|
attrs: {
|
||||||
|
name,
|
||||||
|
version: state.version.toString(),
|
||||||
|
// return snapshot if being synced from scratch
|
||||||
|
return_snapshot: (!state.version).toString()
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const result = await query({
|
const result = await query({
|
||||||
tag: 'iq',
|
tag: 'iq',
|
||||||
attrs: {
|
attrs: {
|
||||||
@@ -213,16 +228,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
{
|
{
|
||||||
tag: 'sync',
|
tag: 'sync',
|
||||||
attrs: { },
|
attrs: { },
|
||||||
content: collections.map(
|
content: nodes
|
||||||
(name) => ({
|
|
||||||
tag: 'collection',
|
|
||||||
attrs: {
|
|
||||||
name,
|
|
||||||
version: states[name].version.toString(),
|
|
||||||
return_snapshot: returnSnapshot ? 'true' : 'false'
|
|
||||||
}
|
|
||||||
})
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
})
|
})
|
||||||
@@ -231,29 +237,41 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
for(const key in decoded) {
|
for(const key in decoded) {
|
||||||
const name = key as WAPatchName
|
const name = key as WAPatchName
|
||||||
const { patches, hasMorePatches, snapshot } = decoded[name]
|
const { patches, hasMorePatches, snapshot } = decoded[name]
|
||||||
if(snapshot) {
|
try {
|
||||||
const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey)
|
if(snapshot) {
|
||||||
states[name] = newState
|
const newState = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey)
|
||||||
|
states[name] = newState
|
||||||
|
|
||||||
|
logger.info(`restored state of ${name} from snapshot to v${newState.version}`)
|
||||||
|
|
||||||
logger.info(`restored state of ${name} from snapshot to v${newState.version}`)
|
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
|
||||||
}
|
|
||||||
// only process if there are syncd patches
|
|
||||||
if(patches.length) {
|
|
||||||
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true)
|
|
||||||
|
|
||||||
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
|
|
||||||
|
|
||||||
logger.info(`synced ${name} to v${newState.version}`)
|
|
||||||
if(newMutations.length) {
|
|
||||||
logger.trace({ newMutations, name }, 'recv new mutations')
|
|
||||||
}
|
}
|
||||||
|
// only process if there are syncd patches
|
||||||
|
if(patches.length) {
|
||||||
|
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], getAppStateSyncKey, true)
|
||||||
|
|
||||||
|
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
|
||||||
|
|
||||||
|
logger.info(`synced ${name} to v${newState.version}`)
|
||||||
|
if(newMutations.length) {
|
||||||
|
logger.trace({ newMutations, name }, 'recv new mutations')
|
||||||
|
}
|
||||||
|
|
||||||
|
appStateChunk.totalMutations.push(...newMutations)
|
||||||
|
}
|
||||||
|
if(hasMorePatches) {
|
||||||
|
logger.info(`${name} has more patches...`)
|
||||||
|
} else { // collection is done with sync
|
||||||
|
collectionsToHandle.delete(name)
|
||||||
|
}
|
||||||
|
} catch(error) {
|
||||||
|
logger.info({ name, error: error.stack }, 'failed to sync state from version, removing and trying from scratch')
|
||||||
|
await authState.keys.set({ "app-state-sync-version": { [name]: null } })
|
||||||
|
|
||||||
appStateChunk.totalMutations.push(...newMutations)
|
attemptsMap[name] = (attemptsMap[name] || 0) + 1
|
||||||
}
|
if(attemptsMap[name] >= MAX_SYNC_ATTEMPTS) {
|
||||||
if(hasMorePatches) {
|
collectionsToHandle.delete(name)
|
||||||
logger.info(`${name} has more patches...`)
|
}
|
||||||
} else { // collection is done with sync
|
|
||||||
collectionsToHandle.delete(name)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -265,19 +283,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
|||||||
return appStateChunk
|
return appStateChunk
|
||||||
}
|
}
|
||||||
|
|
||||||
const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => {
|
|
||||||
let result : AppStateChunk
|
|
||||||
|
|
||||||
try {
|
|
||||||
result = await resyncAppStateInternal(collections, false, returnSnapshot)
|
|
||||||
} catch(error) {
|
|
||||||
logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch')
|
|
||||||
result = await resyncAppStateInternal(collections, true, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* fetch the profile picture of a user/group
|
* fetch the profile picture of a user/group
|
||||||
* type = "preview" for a low res picture
|
* type = "preview" for a low res picture
|
||||||
|
|||||||
Reference in New Issue
Block a user