refactor: history processing

1. fixes issue when some chats/messages are not synced
2. adds info about whether the history chunk is the latest
This commit is contained in:
Adhiraj Singh
2022-01-16 12:51:08 +05:30
parent f09e0f85cc
commit 793b23cb21
3 changed files with 78 additions and 62 deletions

View File

@@ -1,7 +1,7 @@
import { SocketConfig, WAMessageStubType, ParticipantAction, Chat, GroupMetadata, WAMessageKey, Contact } from "../Types"
import { decodeMessageStanza, encodeBigEndian, toNumber, downloadHistory, generateSignalPubKey, xmppPreKey, xmppSignedPreKey } from "../Utils"
import { BinaryNode, jidDecode, jidEncode, isJidStatusBroadcast, areJidsSameUser, getBinaryNodeChildren, jidNormalizedUser, getAllBinaryNodeChildren, BinaryNodeAttributes, isJidGroup } from '../WABinary'
import { SocketConfig, WAMessageStubType, ParticipantAction, Chat, GroupMetadata } from "../Types"
import { decodeMessageStanza, encodeBigEndian, toNumber, downloadAndProcessHistorySyncNotification, generateSignalPubKey, xmppPreKey, xmppSignedPreKey } from "../Utils"
import { BinaryNode, jidDecode, jidEncode, areJidsSameUser, getBinaryNodeChildren, jidNormalizedUser, getAllBinaryNodeChildren, BinaryNodeAttributes, isJidGroup } from '../WABinary'
import { proto } from "../../WAProto"
import { KEY_BUNDLE_TYPE } from "../Defaults"
import { makeChatsSocket } from "./chats"
@@ -38,6 +38,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const msgRetryMap = config.msgRetryCounterMap || { }
const historyCache = new Set<string>()
const sendMessageAck = async({ tag, attrs }: BinaryNode, extraAttrs: BinaryNodeAttributes) => {
const stanza: BinaryNode = {
tag: 'ack',
@@ -122,6 +124,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
logger.info({ msgAttrs: node.attrs, retryCount }, 'sent retry receipt')
})
}
const processMessage = async(message: proto.IWebMessageInfo, chatUpdate: Partial<Chat>) => {
const protocolMsg = message.message?.protocolMessage
if(protocolMsg) {
@@ -129,11 +132,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
case proto.ProtocolMessage.ProtocolMessageType.HISTORY_SYNC_NOTIFICATION:
const histNotification = protocolMsg!.historySyncNotification
logger.info({ type: histNotification.syncType!, id: message.key.id }, 'got history notification')
const history = await downloadHistory(histNotification)
logger.info({ histNotification, id: message.key.id }, 'got history notification')
const info = await downloadAndProcessHistorySyncNotification(histNotification, historyCache)
processHistoryMessage(history)
const meJid = authState.creds.me!.id
await sendNode({
tag: 'receipt',
@@ -143,6 +144,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
to: jidEncode(jidDecode(meJid).user, 'c.us')
}
})
info && ev.emit('chats.set', info)
break
case proto.ProtocolMessage.ProtocolMessageType.APP_STATE_SYNC_KEY_SHARE:
const keys = protocolMsg.appStateSyncKeyShare!.keys
@@ -227,57 +230,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}
const processHistoryMessage = (item: proto.HistorySync) => {
const messages: proto.IWebMessageInfo[] = []
const contacts: Contact[] = []
switch(item.syncType) {
case proto.HistorySync.HistorySyncHistorySyncType.INITIAL_BOOTSTRAP:
const chats = item.conversations!.map(
c => {
const chat: Chat = { ...c }
if(chat.name) {
contacts.push({
id: chat.id,
name: chat.name
})
}
//@ts-expect-error
delete chat.messages
for(const msg of c.messages || []) {
if(msg.message) {
messages.push(msg.message)
}
}
return chat
}
)
ev.emit('chats.set', { chats, messages, contacts })
break
case proto.HistorySync.HistorySyncHistorySyncType.RECENT:
// push remaining messages
for(const conv of item.conversations) {
for(const m of (conv.messages || [])) {
messages.push(m.message!)
}
}
if(messages.length) {
ev.emit('messages.upsert', { messages, type: 'prepend' })
}
break
case proto.HistorySync.HistorySyncHistorySyncType.PUSH_NAME:
contacts.push(
...item.pushnames.map(
p => ({ notify: p.pushname, id: p.id })
)
)
ev.emit('chats.set', { chats: [], messages: [], contacts })
break
case proto.HistorySync.HistorySyncHistorySyncType.INITIAL_STATUS_V3:
// TODO
break
}
}
const processNotification = (node: BinaryNode): Partial<proto.IWebMessageInfo> => {
const result: Partial<proto.IWebMessageInfo> = { }
const [child] = getAllBinaryNodeChildren(node)