feat: implement "snapshot" resyncing

This commit is contained in:
Adhiraj Singh
2021-11-24 18:48:00 +05:30
parent aae2b7a559
commit 983b28ba0e
2 changed files with 146 additions and 55 deletions

View File

@@ -1,7 +1,7 @@
import { SocketConfig, WAPresence, PresenceData, Chat, WAPatchCreate, WAMediaUpload, ChatMutation, WAPatchName, LTHashState, ChatModification, Contact } from "../Types";
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, S_WHATSAPP_NET } from "../WABinary";
import { proto } from '../../WAProto'
import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch } from "../Utils";
import { generateProfilePicture, toNumber, encodeSyncdPatch, decodePatches, extractSyncdPatches, chatModificationToAppPatch, decodeSyncdSnapshot } from "../Utils";
import { makeMessagesSocket } from "./messages-send";
import makeMutex from "../Utils/make-mutex";
@@ -170,7 +170,9 @@ export const makeChatsSocket = (config: SocketConfig) => {
})
}
const resyncAppState = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => {
const resyncAppStateInternal = async(collections: WAPatchName[], fromScratch: boolean = false, returnSnapshot: boolean = false) => {
if(fromScratch) returnSnapshot = true
const states = { } as { [T in WAPatchName]: LTHashState }
for(const name of collections) {
let state: LTHashState = fromScratch ? undefined : await authState.keys.getAppStateSyncVersion(name)
@@ -205,13 +207,20 @@ export const makeChatsSocket = (config: SocketConfig) => {
]
})
const decoded = extractSyncdPatches(result) // extract from binary node
const decoded = await extractSyncdPatches(result) // extract from binary node
for(const key in decoded) {
const name = key as WAPatchName
const { patches, snapshot } = decoded[name]
if(snapshot) {
const newState = await decodeSyncdSnapshot(name, snapshot, authState.keys.getAppStateSyncKey)
states[name] = newState
logger.info(`restored state of ${name} from snapshot to v${newState.version}`)
}
// only process if there are syncd patches
if(decoded[name].length) {
const { newMutations, state: newState } = await decodePatches(name, decoded[name], states[name], authState.keys.getAppStateSyncKey, true)
if(patches.length) {
const { newMutations, state: newState } = await decodePatches(name, patches, states[name], authState.keys.getAppStateSyncKey, true)
await authState.keys.setAppStateSyncVersion(name, newState)
@@ -221,6 +230,15 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
}
const resyncAppState = async(collections: WAPatchName[], returnSnapshot: boolean = false) => {
try {
await resyncAppStateInternal(collections, returnSnapshot)
} catch(error) {
logger.info({ collections, error: error.stack }, 'failed to sync state from version, trying from scratch')
await resyncAppStateInternal(collections, true, true)
}
}
/**
* fetch the profile picture of a user/group
* type = "preview" for a low res picture
@@ -381,12 +399,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
const name = patchCreate.type
await mutationMutex.mutex(
async() => {
try {
await resyncAppState([name])
} catch(error) {
logger.info({ name, error: error.stack }, 'failed to sync state from version, trying from scratch')
await resyncAppState([name], true)
}
await resyncAppState([name])
const { patch, state } = await encodeSyncdPatch(
patchCreate,
authState,

View File

@@ -7,7 +7,7 @@ import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren } from '../WABina
import { toNumber } from './generics'
import { downloadContentFromMessage, } from './messages-media'
export const mutationKeys = (keydata: Uint8Array) => {
const mutationKeys = (keydata: Uint8Array) => {
const expanded = hkdf(keydata, 160, { info: 'WhatsApp Mutation Keys' })
return {
indexKey: expanded.slice(0, 32),
@@ -59,7 +59,7 @@ const computeLtHash = (initial: Uint8Array, macs: Mac[], getPrevSetValueMac: (in
const subOp = getPrevSetValueMac(indexMac, i)
if(operation === proto.SyncdMutation.SyncdMutationSyncdOperation.REMOVE) {
if(!subOp) {
throw new Boom('tried remove, but no buffer', { statusCode: 500 })
throw new Boom('tried remove, but no buffer', { data: { indexMac, valueMac } })
}
} else {
addBuffs.push(new Uint8Array(valueMac).buffer)
@@ -76,7 +76,7 @@ const computeLtHash = (initial: Uint8Array, macs: Mac[], getPrevSetValueMac: (in
return buff
}
export const generateSnapshotMac = (lthash: Uint8Array, version: number, name: WAPatchName, key: Buffer) => {
const generateSnapshotMac = (lthash: Uint8Array, version: number, name: WAPatchName, key: Buffer) => {
const total = Buffer.concat([
lthash,
to64BitNetworkOrder(version),
@@ -84,6 +84,7 @@ export const generateSnapshotMac = (lthash: Uint8Array, version: number, name: W
])
return hmacSign(total, key, 'sha256')
}
const generatePatchMac = (snapshotMac: Uint8Array, valueMacs: Uint8Array[], version: number, type: WAPatchName, key: Buffer) => {
const total = Buffer.concat([
snapshotMac,
@@ -164,11 +165,10 @@ export const encodeSyncdPatch = async(
return { patch, state }
}
export const decodeSyncdPatch = async(
msg: proto.ISyncdPatch,
name: WAPatchName,
export const decodeSyncdMutations = async(
msgMutations: proto.ISyncdMutation[],
getAppStateSyncKey: SignalKeyStore['getAppStateSyncKey'],
validateMacs: boolean = true
validateMacs: boolean
) => {
const keyCache: { [_: string]: ReturnType<typeof mutationKeys> } = { }
const getKey = async(keyId: Uint8Array) => {
@@ -177,7 +177,7 @@ export const decodeSyncdPatch = async(
if(!key) {
const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500, data: msg })
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500, data: { msgMutations } })
}
const result = mutationKeys(keyEnc.keyData!)
keyCache[base64Key] = result
@@ -187,21 +187,10 @@ export const decodeSyncdPatch = async(
}
const mutations: ChatMutation[] = []
if(validateMacs) {
const mainKey = await getKey(msg.keyId!.id)
const mutationmacs = msg.mutations!.map(mutation => mutation.record!.value!.blob!.slice(-32))
const patchMac = generatePatchMac(msg.snapshotMac, mutationmacs, toNumber(msg.version!.version), name, mainKey.patchMacKey)
if(Buffer.compare(patchMac, msg.patchMac) !== 0) {
throw new Boom('Invalid patch mac')
}
}
// indexKey used to HMAC sign record.index.blob
// valueEncryptionKey used to AES-256-CBC encrypt record.value.blob[0:-32]
// the remaining record.value.blob[0:-32] is the mac, it the HMAC sign of key.keyId + decoded proto data + length of bytes in keyId
for(const { operation, record } of msg.mutations!) {
for(const { operation, record } of msgMutations!) {
const key = await getKey(record.keyId!.id!)
const content = Buffer.from(record.value!.blob!)
const encContent = content.slice(0, -32)
@@ -236,43 +225,136 @@ export const decodeSyncdPatch = async(
return { mutations }
}
export const extractSyncdPatches = (result: BinaryNode) => {
export const decodeSyncdPatch = async(
msg: proto.ISyncdPatch,
name: WAPatchName,
getAppStateSyncKey: SignalKeyStore['getAppStateSyncKey'],
validateMacs: boolean
) => {
if(validateMacs) {
const base64Key = Buffer.from(msg.keyId!.id).toString('base64')
const mainKeyObj = await getAppStateSyncKey(base64Key)
const mainKey = mutationKeys(mainKeyObj.keyData!)
const mutationmacs = msg.mutations!.map(mutation => mutation.record!.value!.blob!.slice(-32))
const patchMac = generatePatchMac(msg.snapshotMac, mutationmacs, toNumber(msg.version!.version), name, mainKey.patchMacKey)
if(Buffer.compare(patchMac, msg.patchMac) !== 0) {
throw new Boom('Invalid patch mac')
}
}
const result = await decodeSyncdMutations(msg!.mutations!, getAppStateSyncKey, validateMacs)
return result
}
export const extractSyncdPatches = async(result: BinaryNode) => {
const syncNode = getBinaryNodeChild(result, 'sync')
const collectionNodes = getBinaryNodeChildren(syncNode, 'collection')
const final = { } as { [T in WAPatchName]: proto.ISyncdPatch[] }
for(const collectionNode of collectionNodes) {
const patchesNode = getBinaryNodeChild(collectionNode, 'patches')
const final = { } as { [T in WAPatchName]: { patches: proto.ISyncdPatch[], snapshot?: proto.ISyncdSnapshot } }
await Promise.all(
collectionNodes.map(
async collectionNode => {
const patchesNode = getBinaryNodeChild(collectionNode, 'patches')
const patches = getBinaryNodeChildren(patchesNode || collectionNode, 'patch')
const syncds: proto.ISyncdPatch[] = []
const name = collectionNode.attrs.name as WAPatchName
for(let { content } of patches) {
if(content) {
const syncd = proto.SyncdPatch.decode(content! as Uint8Array)
if(!syncd.version) {
syncd.version = { version: +collectionNode.attrs.version+1 }
const patches = getBinaryNodeChildren(patchesNode || collectionNode, 'patch')
const snapshotNode = getBinaryNodeChild(collectionNode, 'snapshot')
const syncds: proto.ISyncdPatch[] = []
const name = collectionNode.attrs.name as WAPatchName
let snapshot: proto.ISyncdSnapshot | undefined = undefined
if(snapshotNode && !!snapshotNode.content) {
if(!Buffer.isBuffer(snapshotNode)) {
snapshotNode.content = Buffer.from(Object.values(snapshotNode.content))
}
const blobRef = proto.ExternalBlobReference.decode(
snapshotNode.content! as Buffer
)
const data = await downloadExternalBlob(blobRef)
snapshot = proto.SyncdSnapshot.decode(data)
}
syncds.push(syncd)
}
}
final[name] = syncds
}
for(let { content } of patches) {
if(content) {
if(!Buffer.isBuffer(content)) {
content = Buffer.from(Object.values(content))
}
const syncd = proto.SyncdPatch.decode(content! as Uint8Array)
if(!syncd.version) {
syncd.version = { version: +collectionNode.attrs.version+1 }
}
if(syncd.externalMutations) {
const ref = await downloadExternalPatch(syncd.externalMutations)
syncd.mutations.push(...ref.mutations)
}
syncds.push(syncd)
}
}
final[name] = { patches: syncds, snapshot }
}
)
)
return final
}
export const downloadExternalPatch = async(blob: proto.IExternalBlobReference) => {
export const downloadExternalBlob = async(blob: proto.IExternalBlobReference) => {
const stream = await downloadContentFromMessage(blob, 'md-app-state')
let buffer = Buffer.from([])
for await(const chunk of stream) {
buffer = Buffer.concat([buffer, chunk])
}
return buffer
}
export const downloadExternalPatch = async(blob: proto.IExternalBlobReference) => {
const buffer = await downloadExternalBlob(blob)
const syncData = proto.SyncdMutations.decode(buffer)
return syncData
}
export const decodeSyncdSnapshot = async(
name: WAPatchName,
snapshot: proto.ISyncdSnapshot,
getAppStateSyncKey: SignalKeyStore['getAppStateSyncKey'],
validateMacs: boolean = true
) => {
const version = toNumber(snapshot.version!.version!)
const mappedRecords = snapshot.records!.map(
record => ({ record, operation: proto.SyncdMutation.SyncdMutationSyncdOperation.SET })
)
const macs = mappedRecords.map(m => ({
operation: m.operation!,
indexMac: m.record.index!.blob!,
valueMac: m.record.value!.blob!.slice(-32)
}))
const { mutations } = await decodeSyncdMutations(mappedRecords, getAppStateSyncKey, validateMacs)
let hash = Buffer.alloc(128)
hash = computeLtHash(hash, macs, () => undefined)
if(validateMacs) {
const base64Key = Buffer.from(snapshot.keyId!.id!).toString('base64')
const keyEnc = await getAppStateSyncKey(base64Key)
if(!keyEnc) {
throw new Boom(`failed to find key "${base64Key}" to decode mutation`, { statusCode: 500 })
}
const result = mutationKeys(keyEnc.keyData!)
const computedSnapshotMac = generateSnapshotMac(hash, version, name, result.snapshotMacKey)
if(Buffer.compare(snapshot.mac!, computedSnapshotMac) !== 0) {
throw new Boom(`failed to verify LTHash at ${version} of ${name} from snapshot`, { statusCode: 500 })
}
}
const state: LTHashState = { version, mutations, hash }
return state
}
export const decodePatches = async(
name: WAPatchName,
syncds: proto.ISyncdPatch[],
@@ -286,11 +368,7 @@ export const decodePatches = async(
let currentVersion = initial.version
for(const syncd of syncds) {
const { mutations, version, keyId, snapshotMac, externalMutations } = syncd
if(externalMutations) {
const ref = await downloadExternalPatch(externalMutations)
mutations.push(...ref.mutations)
}
const { mutations, version, keyId, snapshotMac } = syncd
const macs = mutations.map(
m => ({
operation: m.operation!,