refactor: message decoding

1. removes redundant receipt (was exactly the same as delivery receipt without the timestamp)
2. decodeWAMessage now returns a proper message struct
3. each message stanza results in exactly one message decoded
This commit is contained in:
Adhiraj Singh
2022-01-06 23:27:14 +05:30
parent 0b5d772b08
commit 6a2cb5a413
3 changed files with 51 additions and 124 deletions

View File

@@ -32,7 +32,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
assertingPreKeys,
sendNode,
relayMessage,
sendDeliveryReceipt,
sendReceipt,
resyncMainAppState,
} = sock
@@ -357,102 +357,23 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
// recv a message
ws.on('CB:message', async(stanza: BinaryNode) => {
const dec = await decodeMessageStanza(stanza, authState)
const fullMessages: proto.IWebMessageInfo[] = []
const { attrs } = stanza
const isGroup = !!stanza.attrs.participant
const sender = (attrs.participant || attrs.from)?.toString()
const isMe = areJidsSameUser(sender, authState.creds.me!.id)
const remoteJid = jidNormalizedUser(dec.chatId)
const key: WAMessageKey = {
remoteJid,
fromMe: isMe,
id: dec.msgId,
participant: dec.participant
}
const partialMsg: Partial<proto.IWebMessageInfo> = {
messageTimestamp: dec.timestamp,
pushName: dec.pushname
}
if(!dec.failures.length) {
await sendMessageAck(stanza, { class: 'receipt' })
}
// if there were some successful decryptions
if(dec.successes.length) {
// send message receipt
let recpAttrs: { [_: string]: any }
if(isMe) {
recpAttrs = {
type: 'sender',
id: stanza.attrs.id,
to: stanza.attrs.from,
}
if(isGroup) {
recpAttrs.participant = stanza.attrs.participant
} else {
recpAttrs.recipient = stanza.attrs.recipient
}
} else {
const isStatus = isJidStatusBroadcast(stanza.attrs.from)
recpAttrs = {
id: stanza.attrs.id,
}
if(isGroup || isStatus) {
recpAttrs.participant = stanza.attrs.participant
recpAttrs.to = dec.chatId
} else {
recpAttrs.to = jidNormalizedUser(dec.chatId)
}
}
await sendNode({ tag: 'receipt', attrs: recpAttrs })
logger.debug({ msgId: dec.msgId }, 'sent message receipt')
await sendDeliveryReceipt(dec.chatId, dec.participant, [dec.msgId])
logger.debug({ msgId: dec.msgId }, 'sent delivery receipt')
}
for(const msg of dec.successes) {
const message = msg.deviceSentMessage?.message || msg
fullMessages.push({
key,
message,
status: isMe ? proto.WebMessageInfo.WebMessageInfoStatus.SERVER_ACK : null,
...partialMsg
})
}
for(const { error } of dec.failures) {
const msg = await decodeMessageStanza(stanza, authState)
// message failed to decrypt
if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) {
logger.error(
{ msgId: dec.msgId, trace: error.stack, data: error.data },
{ msgId: msg.key.id, params: msg.messageStubParameters },
'failure in decrypting message'
)
await sendRetryRequest(stanza)
fullMessages.push({
key,
messageStubType: WAMessageStubType.CIPHERTEXT,
messageStubParameters: [error.message],
...partialMsg
})
}
if(fullMessages.length) {
ev.emit(
'messages.upsert',
{
messages: fullMessages.map(m => proto.WebMessageInfo.fromObject(m)),
type: stanza.attrs.offline ? 'append' : 'notify'
}
)
} else {
logger.warn({ stanza }, `received node with 0 messages`)
await sendMessageAck(stanza, { class: 'receipt' })
// no type in the receipt => message delivered
await sendReceipt(msg.key.remoteJid!, msg.key.participant, [msg.key.id!], undefined)
logger.debug({ msg: msg.key }, 'sent delivery receipt')
}
msg.key.remoteJid = jidNormalizedUser(msg.key.remoteJid!)
ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' })
})
ws.on('CB:ack,class:message', async(node: BinaryNode) => {
@@ -477,7 +398,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
})
const sendMessagesAgain = async(key: proto.IMessageKey, ids: string[]) => {
const msgs = await Promise.all(
ids.map(id => (
config.getMessage({ ...key, id })

View File

@@ -73,7 +73,10 @@ export const makeMessagesSocket = (config: SocketConfig) => {
}
return mediaConn
}
/**
* generic send receipt function
* used for receipts of phone call, read, delivery etc.
* */
const sendReceipt = async(jid: string, participant: string | undefined, messageIds: string[], type: 'read' | 'read-self' | undefined) => {
const node: BinaryNode = {
tag: 'receipt',
@@ -107,10 +110,6 @@ export const makeMessagesSocket = (config: SocketConfig) => {
await sendNode(node)
}
const sendDeliveryReceipt = (jid: string, participant: string | undefined, messageIds: string[]) => {
return sendReceipt(jid, participant, messageIds, undefined)
}
const sendReadReceipt = async(jid: string, participant: string | undefined, messageIds: string[]) => {
const privacySettings = await fetchPrivacySettings()
// based on privacy settings, we have to change the read type
@@ -424,7 +423,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
...sock,
assertSessions,
relayMessage,
sendDeliveryReceipt,
sendReceipt,
sendReadReceipt,
refreshMediaConn,
waUploadToServer,

View File

@@ -1,15 +1,15 @@
import { Boom } from '@hapi/boom'
import { unpadRandomMax16 } from "./generics"
import { AuthenticationState } from "../Types"
import { areJidsSameUser, BinaryNode as BinaryNodeM, encodeBinaryNode, isJidBroadcast, isJidGroup, isJidStatusBroadcast, isJidUser } from '../WABinary'
import { WAMessageKey, AuthenticationState } from '../Types'
import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidStatusBroadcast, isJidUser, jidNormalizedUser } from '../WABinary'
import { decryptGroupSignalProto, decryptSignalProto, processSenderKeyMessage } from './signal'
import { proto } from '../../WAProto'
type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status'
export const decodeMessageStanza = async(stanza: BinaryNodeM, auth: AuthenticationState) => {
const deviceIdentity = (stanza.content as BinaryNodeM[])?.find(m => m.tag === 'device-identity')
const deviceIdentityBytes = deviceIdentity ? deviceIdentity.content as Buffer : undefined
export const decodeMessageStanza = async(stanza: BinaryNode, auth: AuthenticationState) => {
//const deviceIdentity = (stanza.content as BinaryNodeM[])?.find(m => m.tag === 'device-identity')
//const deviceIdentityBytes = deviceIdentity ? deviceIdentity.content as Buffer : undefined
let msgType: MessageType
let chatId: string
@@ -53,18 +53,34 @@ export const decodeMessageStanza = async(stanza: BinaryNodeM, auth: Authenticati
chatId = from
author = participant
}
const sender = msgType === 'chat' ? author : chatId
const successes: proto.Message[] = []
const failures: { error: Boom }[] = []
const fromMe = isMe(participant || chatId)
const pushname = stanza.attrs.notify
const key: WAMessageKey = {
remoteJid: chatId,
fromMe,
id: msgId,
participant
}
const fullMessage: proto.IWebMessageInfo = {
key,
messageTimestamp: +stanza.attrs.t,
pushName: pushname,
status: key.fromMe ? proto.WebMessageInfo.WebMessageInfoStatus.SERVER_ACK : null,
}
if(Array.isArray(stanza.content)) {
for(const { tag, attrs, content } of stanza.content as BinaryNodeM[]) {
for(const { tag, attrs, content } of stanza.content) {
if(tag !== 'enc') continue
if(!Buffer.isBuffer(content) && !(content instanceof Uint8Array)) continue
if(!(content instanceof Uint8Array)) continue
let msgBuffer: Buffer
try {
let msgBuffer: Buffer
const e2eType = attrs.type
switch(e2eType) {
case 'skmsg':
@@ -76,28 +92,20 @@ export const decodeMessageStanza = async(stanza: BinaryNodeM, auth: Authenticati
msgBuffer = await decryptSignalProto(user, e2eType, content as Buffer, auth)
break
}
const msg = proto.Message.decode(unpadRandomMax16(msgBuffer))
let msg: proto.IMessage = proto.Message.decode(unpadRandomMax16(msgBuffer))
msg = msg.deviceSentMessage?.message || msg
if(msg.senderKeyDistributionMessage) {
await processSenderKeyMessage(author, msg.senderKeyDistributionMessage, auth)
}
successes.push(msg)
if(fullMessage.message) Object.assign(fullMessage.message, msg)
else fullMessage.message = msg
} catch(error) {
failures.push({ error: new Boom(error, { data: Buffer.from(encodeBinaryNode(stanza)).toString('base64') }) })
fullMessage.messageStubType = proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT
fullMessage.messageStubParameters = [error.message]
}
}
}
return {
msgId,
chatId,
author,
from,
timestamp: +stanza.attrs.t,
participant,
recipient,
pushname: stanza.attrs.notify,
successes,
failures
}
return fullMessage
}