fix: duplicate mutation release on failed patch

This commit is contained in:
Adhiraj Singh
2022-11-08 22:28:27 +05:30
parent e1fc22b3d3
commit f892a1e81f
4 changed files with 76 additions and 59 deletions

View File

@@ -35,7 +35,7 @@
"dependencies": { "dependencies": {
"@hapi/boom": "^9.1.3", "@hapi/boom": "^9.1.3",
"axios": "^0.24.0", "axios": "^0.24.0",
"futoin-hkdf": "^1.5.0", "futoin-hkdf": "^1.5.1",
"libsignal": "git+https://github.com/adiwajshing/libsignal-node", "libsignal": "git+https://github.com/adiwajshing/libsignal-node",
"music-metadata": "^7.12.3", "music-metadata": "^7.12.3",
"node-cache": "^5.1.2", "node-cache": "^5.1.2",

View File

@@ -2,13 +2,13 @@ import { Boom } from '@hapi/boom'
import { proto } from '../../WAProto' import { proto } from '../../WAProto'
import { PROCESSABLE_HISTORY_TYPES } from '../Defaults' import { PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types' import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils' import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex' import { makeMutex } from '../Utils/make-mutex'
import processMessage from '../Utils/process-message' import processMessage from '../Utils/process-message'
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
import { makeSocket } from './socket' import { makeSocket } from './socket'
const MAX_SYNC_ATTEMPTS = 5 const MAX_SYNC_ATTEMPTS = 2
export const makeChatsSocket = (config: SocketConfig) => { export const makeChatsSocket = (config: SocketConfig) => {
const { logger, markOnlineOnConnect, shouldSyncHistoryMessage, fireInitQueries } = config const { logger, markOnlineOnConnect, shouldSyncHistoryMessage, fireInitQueries } = config
@@ -293,16 +293,16 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], isInitialSync: boolean) => { const resyncAppState = ev.createBufferedFunction(async(collections: readonly WAPatchName[], isInitialSync: boolean) => {
const { onMutation } = newAppStateChunkHandler(isInitialSync)
// we use this to determine which events to fire // we use this to determine which events to fire
// otherwise when we resync from scratch -- all notifications will fire // otherwise when we resync from scratch -- all notifications will fire
const initialVersionMap: { [T in WAPatchName]?: number } = { } const initialVersionMap: { [T in WAPatchName]?: number } = { }
const globalMutationMap: ChatMutationMap = { }
await authState.keys.transaction( await authState.keys.transaction(
async() => { async() => {
const collectionsToHandle = new Set<string>(collections) const collectionsToHandle = new Set<string>(collections)
// in case something goes wrong -- ensure we don't enter a loop that cannot be exited from // in case something goes wrong -- ensure we don't enter a loop that cannot be exited from
const attemptsMap = { } as { [T in WAPatchName]: number | undefined } const attemptsMap: { [T in WAPatchName]?: number } = { }
// keep executing till all collections are done // keep executing till all collections are done
// sometimes a single patch request will not return all the patches (God knows why) // sometimes a single patch request will not return all the patches (God knows why)
// so we fetch till they're all done (this is determined by the "has_more_patches" flag) // so we fetch till they're all done (this is determined by the "has_more_patches" flag)
@@ -353,30 +353,29 @@ export const makeChatsSocket = (config: SocketConfig) => {
] ]
}) })
const decoded = await extractSyncdPatches(result, config?.options) // extract from binary node // extract from binary node
const decoded = await extractSyncdPatches(result, config?.options)
for(const key in decoded) { for(const key in decoded) {
const name = key as WAPatchName const name = key as WAPatchName
const { patches, hasMorePatches, snapshot } = decoded[name] const { patches, hasMorePatches, snapshot } = decoded[name]
try { try {
if(snapshot) { if(snapshot) {
const { state: newState } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name], onMutation) const { state: newState, mutationMap } = await decodeSyncdSnapshot(name, snapshot, getAppStateSyncKey, initialVersionMap[name])
states[name] = newState states[name] = newState
Object.assign(globalMutationMap, mutationMap)
logger.info( logger.info(`restored state of ${name} from snapshot to v${newState.version} with mutations`)
`restored state of ${name} from snapshot to v${newState.version} with mutations`
)
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
} }
// only process if there are syncd patches // only process if there are syncd patches
if(patches.length) { if(patches.length) {
const { state: newState } = await decodePatches( const { state: newState, mutationMap } = await decodePatches(
name, name,
patches, patches,
states[name], states[name],
getAppStateSyncKey, getAppStateSyncKey,
onMutation,
config.options, config.options,
initialVersionMap[name] initialVersionMap[name]
) )
@@ -384,6 +383,9 @@ export const makeChatsSocket = (config: SocketConfig) => {
await authState.keys.set({ 'app-state-sync-version': { [name]: newState } }) await authState.keys.set({ 'app-state-sync-version': { [name]: newState } })
logger.info(`synced ${name} to v${newState.version}`) logger.info(`synced ${name} to v${newState.version}`)
initialVersionMap[name] = newState.version
Object.assign(globalMutationMap, mutationMap)
} }
if(hasMorePatches) { if(hasMorePatches) {
@@ -396,8 +398,11 @@ export const makeChatsSocket = (config: SocketConfig) => {
// or key not found // or key not found
const isIrrecoverableError = attemptsMap[name]! >= MAX_SYNC_ATTEMPTS const isIrrecoverableError = attemptsMap[name]! >= MAX_SYNC_ATTEMPTS
|| error.output?.statusCode === 404 || error.output?.statusCode === 404
|| error.message.includes('TypeError') || error.name === 'TypeError'
logger.info({ name, error: error.stack }, `failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}`) logger.info(
{ name, error: error.stack },
`failed to sync state from version${isIrrecoverableError ? '' : ', removing and trying from scratch'}`
)
await authState.keys.set({ 'app-state-sync-version': { [name]: null } }) await authState.keys.set({ 'app-state-sync-version': { [name]: null } })
// increment number of retries // increment number of retries
attemptsMap[name] = (attemptsMap[name] || 0) + 1 attemptsMap[name] = (attemptsMap[name] || 0) + 1
@@ -411,6 +416,11 @@ export const makeChatsSocket = (config: SocketConfig) => {
} }
} }
) )
const { onMutation } = newAppStateChunkHandler(isInitialSync)
for(const key in globalMutationMap) {
onMutation(globalMutationMap[key])
}
}) })
/** /**
@@ -580,16 +590,18 @@ export const makeChatsSocket = (config: SocketConfig) => {
if(config.emitOwnEvents) { if(config.emitOwnEvents) {
const { onMutation } = newAppStateChunkHandler(false) const { onMutation } = newAppStateChunkHandler(false)
await decodePatches( const { mutationMap } = await decodePatches(
name, name,
[{ ...encodeResult!.patch, version: { version: encodeResult!.state.version }, }], [{ ...encodeResult!.patch, version: { version: encodeResult!.state.version }, }],
initial!, initial!,
getAppStateSyncKey, getAppStateSyncKey,
onMutation,
config.options, config.options,
undefined, undefined,
logger, logger,
) )
for(const key in mutationMap) {
onMutation(mutationMap[key])
}
} }
} }

View File

@@ -11,6 +11,8 @@ import { downloadContentFromMessage, } from './messages-media'
type FetchAppStateSyncKey = (keyId: string) => Promise<proto.Message.IAppStateSyncKeyData | null | undefined> type FetchAppStateSyncKey = (keyId: string) => Promise<proto.Message.IAppStateSyncKeyData | null | undefined>
export type ChatMutationMap = { [index: string]: ChatMutation }
const mutationKeys = (keydata: Uint8Array) => { const mutationKeys = (keydata: Uint8Array) => {
const expanded = hkdf(keydata, 160, { info: 'WhatsApp Mutation Keys' }) const expanded = hkdf(keydata, 160, { info: 'WhatsApp Mutation Keys' })
return { return {
@@ -50,9 +52,9 @@ const generateMac = (operation: proto.SyncdMutation.SyncdOperation, data: Buffer
} }
const to64BitNetworkOrder = (e: number) => { const to64BitNetworkOrder = (e: number) => {
const t = new ArrayBuffer(8) const buff = Buffer.alloc(8)
new DataView(t).setUint32(4, e, !1) buff.writeUint32BE(e, 4)
return Buffer.from(t) return buff
} }
type Mac = { indexMac: Uint8Array, valueMac: Uint8Array, operation: proto.SyncdMutation.SyncdOperation } type Mac = { indexMac: Uint8Array, valueMac: Uint8Array, operation: proto.SyncdMutation.SyncdOperation }
@@ -84,7 +86,8 @@ const makeLtHashGenerator = ({ indexValueMap, hash }: Pick<LTHashState, 'hash' |
} }
}, },
finish: () => { finish: () => {
const result = LT_HASH_ANTI_TAMPERING.subtractThenAdd(new Uint8Array(hash).buffer, addBuffs, subBuffs) const hashArrayBuffer = new Uint8Array(hash).buffer
const result = LT_HASH_ANTI_TAMPERING.subtractThenAdd(hashArrayBuffer, addBuffs, subBuffs)
const buffer = Buffer.from(result) const buffer = Buffer.from(result)
return { return {
@@ -188,24 +191,6 @@ export const decodeSyncdMutations = async(
onMutation: (mutation: ChatMutation) => void, onMutation: (mutation: ChatMutation) => void,
validateMacs: boolean validateMacs: boolean
) => { ) => {
const keyCache: { [_: string]: ReturnType<typeof mutationKeys> } = { }
const getKey = async(keyId: Uint8Array) => {
const base64Key = Buffer.from(keyId!).toString('base64')
let key = keyCache[base64Key]
if(!key) {
const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } })
}
const result = mutationKeys(keyEnc.keyData!)
keyCache[base64Key] = result
key = result
}
return key
}
const ltGenerator = makeLtHashGenerator(initialState) const ltGenerator = makeLtHashGenerator(initialState)
// indexKey used to HMAC sign record.index.blob // indexKey used to HMAC sign record.index.blob
// valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32] // valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32]
@@ -248,6 +233,16 @@ export const decodeSyncdMutations = async(
} }
return ltGenerator.finish() return ltGenerator.finish()
async function getKey(keyId: Uint8Array) {
const base64Key = Buffer.from(keyId!).toString('base64')
const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 404, data: { msgMutations } })
}
return mutationKeys(keyEnc.keyData!)
}
} }
export const decodeSyncdPatch = async( export const decodeSyncdPatch = async(
@@ -363,26 +358,25 @@ export const decodeSyncdSnapshot = async(
snapshot: proto.ISyncdSnapshot, snapshot: proto.ISyncdSnapshot,
getAppStateSyncKey: FetchAppStateSyncKey, getAppStateSyncKey: FetchAppStateSyncKey,
minimumVersionNumber: number | undefined, minimumVersionNumber: number | undefined,
onMutation?: (mutation: ChatMutation) => void,
validateMacs: boolean = true validateMacs: boolean = true
) => { ) => {
const newState = newLTHashState() const newState = newLTHashState()
newState.version = toNumber(snapshot.version!.version!) newState.version = toNumber(snapshot.version!.version!)
onMutation = onMutation || (() => { }) const mutationMap: ChatMutationMap = {}
const areMutationsRequired = typeof minimumVersionNumber === 'undefined'
|| newState.version > minimumVersionNumber
const { hash, indexValueMap } = await decodeSyncdMutations( const { hash, indexValueMap } = await decodeSyncdMutations(
snapshot.records!, snapshot.records!,
newState, newState,
getAppStateSyncKey, getAppStateSyncKey,
mutation => { areMutationsRequired
if(onMutation) { ? (mutation) => {
const areMutationsRequired = typeof minimumVersionNumber === 'undefined' || newState.version > minimumVersionNumber const index = mutation.syncAction.index?.toString()
if(areMutationsRequired) { mutationMap[index!] = mutation
onMutation(mutation)
}
} }
}, : () => { },
validateMacs validateMacs
) )
newState.hash = hash newState.hash = hash
@@ -392,18 +386,19 @@ export const decodeSyncdSnapshot = async(
const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64') const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64')
const keyEnc = await getAppStateSyncKey(base64Key) const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) { if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500 }) throw new Boom(`failed to find key "${base64Key}" to decode mutation`)
} }
const result = mutationKeys(keyEnc.keyData!) const result = mutationKeys(keyEnc.keyData!)
const computedSnapshotMac = generateSnapshotMac(newState.hash, newState.version, name, result.snapshotMacKey) const computedSnapshotMac = generateSnapshotMac(newState.hash, newState.version, name, result.snapshotMacKey)
if(Buffer.compare(snapshot.mac!, computedSnapshotMac) !== 0) { if(Buffer.compare(snapshot.mac!, computedSnapshotMac) !== 0) {
throw new Boom(`failed to verify LTHash at ${newState.version} of ${name} from snapshot`, { statusCode: 500 }) throw new Boom(`failed to verify LTHash at ${newState.version} of ${name} from snapshot`)
} }
} }
return { return {
state: newState, state: newState,
mutationMap
} }
} }
@@ -412,21 +407,20 @@ export const decodePatches = async(
syncds: proto.ISyncdPatch[], syncds: proto.ISyncdPatch[],
initial: LTHashState, initial: LTHashState,
getAppStateSyncKey: FetchAppStateSyncKey, getAppStateSyncKey: FetchAppStateSyncKey,
onMutation: (mut: ChatMutation) => void,
options: AxiosRequestConfig<any>, options: AxiosRequestConfig<any>,
minimumVersionNumber?: number, minimumVersionNumber?: number,
logger?: Logger, logger?: Logger,
validateMacs: boolean = true validateMacs: boolean = true
) => { ) => {
syncds = [...syncds]
const newState: LTHashState = { const newState: LTHashState = {
...initial, ...initial,
indexValueMap: { ...initial.indexValueMap } indexValueMap: { ...initial.indexValueMap }
} }
while(syncds.length) { const mutationMap: ChatMutationMap = { }
const syncd = syncds[0]
for(let i = 0;i < syncds.length;i++) {
const syncd = syncds[i]
const { version, keyId, snapshotMac } = syncd const { version, keyId, snapshotMac } = syncd
if(syncd.externalMutations) { if(syncd.externalMutations) {
logger?.trace({ name, version }, 'downloading external patch') logger?.trace({ name, version }, 'downloading external patch')
@@ -439,7 +433,20 @@ export const decodePatches = async(
newState.version = patchVersion newState.version = patchVersion
const shouldMutate = typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber const shouldMutate = typeof minimumVersionNumber === 'undefined' || patchVersion > minimumVersionNumber
const decodeResult = await decodeSyncdPatch(syncd, name, newState, getAppStateSyncKey, shouldMutate ? onMutation : (() => { }), validateMacs)
const decodeResult = await decodeSyncdPatch(
syncd,
name,
newState,
getAppStateSyncKey,
shouldMutate
? mutation => {
const index = mutation.syncAction.index?.toString()
mutationMap[index!] = mutation
}
: (() => { }),
validateMacs
)
newState.hash = decodeResult.hash newState.hash = decodeResult.hash
newState.indexValueMap = decodeResult.indexValueMap newState.indexValueMap = decodeResult.indexValueMap
@@ -460,11 +467,9 @@ export const decodePatches = async(
// clear memory used up by the mutations // clear memory used up by the mutations
syncd.mutations = [] syncd.mutations = []
// pop first element
syncds.splice(0, 1)
} }
return { state: newState } return { state: newState, mutationMap }
} }
export const chatModificationToAppPatch = ( export const chatModificationToAppPatch = (

View File

@@ -2448,7 +2448,7 @@ functions-have-names@^1.2.2:
resolved "https://registry.yarnpkg.com/functions-have-names/-/functions-have-names-1.2.3.tgz#0404fe4ee2ba2f607f0e0ec3c80bae994133b834" resolved "https://registry.yarnpkg.com/functions-have-names/-/functions-have-names-1.2.3.tgz#0404fe4ee2ba2f607f0e0ec3c80bae994133b834"
integrity sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ== integrity sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ==
futoin-hkdf@^1.5.0: futoin-hkdf@^1.5.1:
version "1.5.1" version "1.5.1"
resolved "https://registry.yarnpkg.com/futoin-hkdf/-/futoin-hkdf-1.5.1.tgz#141f00427bc9950b38a42aa786b99c318b9b688d" resolved "https://registry.yarnpkg.com/futoin-hkdf/-/futoin-hkdf-1.5.1.tgz#141f00427bc9950b38a42aa786b99c318b9b688d"
integrity sha512-g5d0Qp7ks55hYmYmfqn4Nz18XH49lcCR+vvIvHT92xXnsJaGZmY1EtWQWilJ6BQp57heCIXM/rRo+AFep8hGgg== integrity sha512-g5d0Qp7ks55hYmYmfqn4Nz18XH49lcCR+vvIvHT92xXnsJaGZmY1EtWQWilJ6BQp57heCIXM/rRo+AFep8hGgg==