mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
feat: implement event buffer for offline msgs
!BREAKING_CHANGE 1. this allows all offline notifications to be combined into a batch -- to reduce the number of events being sent out 2. to enable the above, the "message.reaction" event has been made an array. Also removes the need for the "operation" field 3. have also now started processing all events under a single queue to prevent state sync problems
This commit is contained in:
@@ -1,16 +1,19 @@
|
||||
import { Boom } from '@hapi/boom'
|
||||
import { proto } from '../../WAProto'
|
||||
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
|
||||
import { chatModificationToAppPatch, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils'
|
||||
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, InitialReceivedChatsState, LTHashState, MessageUpsertType, PresenceData, SocketConfig, SyncActionUpdates, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence } from '../Types'
|
||||
import { chatModificationToAppPatch, debouncedTimeout, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, isHistoryMsg, newAppStateChunk, newLTHashState, processSyncAction, syncActionUpdatesToEventMap } from '../Utils'
|
||||
import { makeMutex } from '../Utils/make-mutex'
|
||||
import processMessage from '../Utils/process-message'
|
||||
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidNormalizedUser, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
|
||||
import { makeMessagesSocket } from './messages-send'
|
||||
import { makeSocket } from './socket'
|
||||
|
||||
const MAX_SYNC_ATTEMPTS = 5
|
||||
|
||||
const APP_STATE_SYNC_TIMEOUT_MS = 10_000
|
||||
|
||||
export const makeChatsSocket = (config: SocketConfig) => {
|
||||
const { logger, markOnlineOnConnect } = config
|
||||
const sock = makeMessagesSocket(config)
|
||||
const { logger, markOnlineOnConnect, treatCiphertextMessagesAsReal, downloadHistory } = config
|
||||
const sock = makeSocket(config)
|
||||
const {
|
||||
ev,
|
||||
ws,
|
||||
@@ -18,18 +21,60 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
||||
generateMessageTag,
|
||||
sendNode,
|
||||
query,
|
||||
fetchPrivacySettings,
|
||||
onUnexpectedError,
|
||||
emitEventsFromMap,
|
||||
} = sock
|
||||
|
||||
let privacySettings: { [_: string]: string } | undefined
|
||||
|
||||
const mutationMutex = makeMutex()
|
||||
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
|
||||
const processingMutex = makeMutex()
|
||||
/** cache to ensure new history sync events do not have duplicate items */
|
||||
const historyCache = new Set<string>()
|
||||
let recvChats: InitialReceivedChatsState = { }
|
||||
|
||||
const appStateSyncTimeout = debouncedTimeout(
|
||||
APP_STATE_SYNC_TIMEOUT_MS,
|
||||
async() => {
|
||||
logger.info(
|
||||
{ recvChats: Object.keys(recvChats).length },
|
||||
'doing initial app state sync'
|
||||
)
|
||||
if(ws.readyState === ws.OPEN) {
|
||||
await resyncMainAppState(recvChats)
|
||||
}
|
||||
|
||||
historyCache.clear()
|
||||
recvChats = { }
|
||||
}
|
||||
)
|
||||
|
||||
/** helper function to fetch the given app state sync key */
|
||||
const getAppStateSyncKey = async(keyId: string) => {
|
||||
const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId])
|
||||
return key
|
||||
}
|
||||
|
||||
const fetchPrivacySettings = async(force: boolean = false) => {
|
||||
if(!privacySettings || force) {
|
||||
const { content } = await query({
|
||||
tag: 'iq',
|
||||
attrs: {
|
||||
xmlns: 'privacy',
|
||||
to: S_WHATSAPP_NET,
|
||||
type: 'get'
|
||||
},
|
||||
content: [
|
||||
{ tag: 'privacy', attrs: { } }
|
||||
]
|
||||
})
|
||||
privacySettings = reduceBinaryNodeToDictionary(content[0] as BinaryNode, 'category')
|
||||
}
|
||||
|
||||
return privacySettings
|
||||
}
|
||||
|
||||
/** helper function to run a generic IQ query */
|
||||
const interactiveQuery = async(userNodes: BinaryNode[], queryNode: BinaryNode) => {
|
||||
const result = await query({
|
||||
@@ -639,6 +684,53 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
||||
])
|
||||
}
|
||||
|
||||
const processMessageLocal = async(msg: proto.IWebMessageInfo) => {
|
||||
// process message and emit events
|
||||
const newEvents = await processMessage(
|
||||
msg,
|
||||
{
|
||||
downloadHistory,
|
||||
historyCache,
|
||||
recvChats,
|
||||
creds: authState.creds,
|
||||
keyStore: authState.keys,
|
||||
logger,
|
||||
treatCiphertextMessagesAsReal
|
||||
}
|
||||
)
|
||||
|
||||
const isAnyHistoryMsg = isHistoryMsg(msg.message)
|
||||
if(isAnyHistoryMsg) {
|
||||
// we only want to sync app state once we've all the history
|
||||
// restart the app state sync timeout
|
||||
logger.debug('restarting app sync timeout')
|
||||
appStateSyncTimeout.start()
|
||||
}
|
||||
|
||||
return newEvents
|
||||
}
|
||||
|
||||
const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => {
|
||||
ev.emit('messages.upsert', { messages: [msg], type })
|
||||
|
||||
if(!!msg.pushName) {
|
||||
let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid)
|
||||
jid = jidNormalizedUser(jid)
|
||||
|
||||
if(!msg.key.fromMe) {
|
||||
ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }])
|
||||
}
|
||||
|
||||
// update our pushname too
|
||||
if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) {
|
||||
ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } })
|
||||
}
|
||||
}
|
||||
|
||||
const events = await processMessageLocal(msg)
|
||||
emitEventsFromMap(events)
|
||||
}
|
||||
|
||||
ws.on('CB:presence', handlePresenceUpdate)
|
||||
ws.on('CB:chatstate', handlePresenceUpdate)
|
||||
|
||||
@@ -664,17 +756,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
||||
}
|
||||
})
|
||||
|
||||
ws.on('CB:notification,type:server_sync', (node: BinaryNode) => {
|
||||
const update = getBinaryNodeChild(node, 'collection')
|
||||
if(update) {
|
||||
const name = update.attrs.name as WAPatchName
|
||||
mutationMutex.mutex(() => (
|
||||
resyncAppState([name], undefined)
|
||||
.catch(err => logger.error({ trace: err.stack, node }, 'failed to sync state'))
|
||||
))
|
||||
}
|
||||
})
|
||||
|
||||
ev.on('connection.update', ({ connection }) => {
|
||||
if(connection === 'open') {
|
||||
fireInitQueries()
|
||||
@@ -686,6 +767,10 @@ export const makeChatsSocket = (config: SocketConfig) => {
|
||||
|
||||
return {
|
||||
...sock,
|
||||
mutationMutex,
|
||||
processingMutex,
|
||||
fetchPrivacySettings,
|
||||
upsertMessage,
|
||||
appPatch,
|
||||
sendPresenceUpdate,
|
||||
presenceSubscribe,
|
||||
|
||||
@@ -2,11 +2,11 @@ import { proto } from '../../WAProto'
|
||||
import { GroupMetadata, ParticipantAction, SocketConfig, WAMessageKey, WAMessageStubType } from '../Types'
|
||||
import { generateMessageID, unixTimestampSeconds } from '../Utils'
|
||||
import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, jidEncode, jidNormalizedUser } from '../WABinary'
|
||||
import { makeSocket } from './socket'
|
||||
import { makeChatsSocket } from './chats'
|
||||
|
||||
export const makeGroupsSocket = (config: SocketConfig) => {
|
||||
const sock = makeSocket(config)
|
||||
const { authState, ev, query } = sock
|
||||
const sock = makeChatsSocket(config)
|
||||
const { authState, ev, query, upsertMessage } = sock
|
||||
|
||||
const groupQuery = async(jid: string, type: 'get' | 'set', content: BinaryNode[]) => (
|
||||
query({
|
||||
@@ -175,25 +175,23 @@ export const makeGroupsSocket = (config: SocketConfig) => {
|
||||
}
|
||||
|
||||
// generate the group add message
|
||||
ev.emit('messages.upsert', {
|
||||
messages: [
|
||||
{
|
||||
key: {
|
||||
remoteJid: inviteMessage.groupJid,
|
||||
id: generateMessageID(),
|
||||
fromMe: false,
|
||||
participant: key.remoteJid,
|
||||
},
|
||||
messageStubType: WAMessageStubType.GROUP_PARTICIPANT_ADD,
|
||||
messageStubParameters: [
|
||||
authState.creds.me!.id
|
||||
],
|
||||
await upsertMessage(
|
||||
{
|
||||
key: {
|
||||
remoteJid: inviteMessage.groupJid,
|
||||
id: generateMessageID(),
|
||||
fromMe: false,
|
||||
participant: key.remoteJid,
|
||||
messageTimestamp: unixTimestampSeconds()
|
||||
}
|
||||
],
|
||||
type: 'notify'
|
||||
})
|
||||
},
|
||||
messageStubType: WAMessageStubType.GROUP_PARTICIPANT_ADD,
|
||||
messageStubParameters: [
|
||||
authState.creds.me!.id
|
||||
],
|
||||
participant: key.remoteJid,
|
||||
messageTimestamp: unixTimestampSeconds()
|
||||
},
|
||||
'notify'
|
||||
)
|
||||
|
||||
return results.attrs.from
|
||||
},
|
||||
|
||||
@@ -1,65 +1,40 @@
|
||||
|
||||
import { proto } from '../../WAProto'
|
||||
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
|
||||
import { BaileysEventMap, InitialReceivedChatsState, MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType } from '../Types'
|
||||
import { debouncedTimeout, decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, normalizeMessageContent, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
|
||||
import { makeKeyedMutex, makeMutex } from '../Utils/make-mutex'
|
||||
import processMessage, { cleanMessage } from '../Utils/process-message'
|
||||
import { MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType, WAPatchName } from '../Types'
|
||||
import { decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, generateSignalPubKey, getCallStatusFromNode, getNextPreKeys, getStatusFromReceiptType, isHistoryMsg, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
|
||||
import { makeMutex } from '../Utils/make-mutex'
|
||||
import { cleanMessage } from '../Utils/process-message'
|
||||
import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getAllBinaryNodeChildren, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary'
|
||||
import { makeChatsSocket } from './chats'
|
||||
import { extractGroupMetadata } from './groups'
|
||||
|
||||
const APP_STATE_SYNC_TIMEOUT_MS = 10_000
|
||||
import { makeMessagesSocket } from './messages-send'
|
||||
|
||||
export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
const {
|
||||
logger,
|
||||
treatCiphertextMessagesAsReal,
|
||||
retryRequestDelayMs,
|
||||
downloadHistory,
|
||||
getMessage
|
||||
} = config
|
||||
const sock = makeChatsSocket(config)
|
||||
const sock = makeMessagesSocket(config)
|
||||
const {
|
||||
ev,
|
||||
authState,
|
||||
ws,
|
||||
mutationMutex,
|
||||
processingMutex,
|
||||
upsertMessage,
|
||||
resyncAppState,
|
||||
onUnexpectedError,
|
||||
assertSessions,
|
||||
sendNode,
|
||||
relayMessage,
|
||||
sendReceipt,
|
||||
resyncMainAppState,
|
||||
emitEventsFromMap,
|
||||
uploadPreKeys,
|
||||
} = sock
|
||||
|
||||
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
|
||||
const processingMutex = makeKeyedMutex()
|
||||
|
||||
/** this mutex ensures that each retryRequest will wait for the previous one to finish */
|
||||
const retryMutex = makeMutex()
|
||||
|
||||
/** cache to ensure new history sync events do not have duplicate items */
|
||||
const historyCache = new Set<string>()
|
||||
let recvChats: InitialReceivedChatsState = { }
|
||||
|
||||
const appStateSyncTimeout = debouncedTimeout(
|
||||
APP_STATE_SYNC_TIMEOUT_MS,
|
||||
async() => {
|
||||
logger.info(
|
||||
{ recvChats: Object.keys(recvChats).length },
|
||||
'doing initial app state sync'
|
||||
)
|
||||
if(ws.readyState === ws.OPEN) {
|
||||
await resyncMainAppState(recvChats)
|
||||
}
|
||||
|
||||
historyCache.clear()
|
||||
recvChats = { }
|
||||
}
|
||||
)
|
||||
|
||||
const msgRetryMap = config.msgRetryCounterMap || { }
|
||||
const callOfferData: { [id: string]: WACallEvent } = { }
|
||||
|
||||
@@ -169,37 +144,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
)
|
||||
}
|
||||
|
||||
const processMessageLocal = async(msg: proto.IWebMessageInfo) => {
|
||||
// process message and emit events
|
||||
const newEvents = await processMessage(
|
||||
msg,
|
||||
{
|
||||
downloadHistory,
|
||||
historyCache,
|
||||
recvChats,
|
||||
creds: authState.creds,
|
||||
keyStore: authState.keys,
|
||||
logger,
|
||||
treatCiphertextMessagesAsReal
|
||||
}
|
||||
)
|
||||
|
||||
// send ack for history message
|
||||
const normalizedContent = !!msg.message ? normalizeMessageContent(msg.message) : undefined
|
||||
const isAnyHistoryMsg = !!normalizedContent?.protocolMessage?.historySyncNotification
|
||||
if(isAnyHistoryMsg) {
|
||||
// we only want to sync app state once we've all the history
|
||||
// restart the app state sync timeout
|
||||
logger.debug('restarting app sync timeout')
|
||||
appStateSyncTimeout.start()
|
||||
|
||||
const jid = jidEncode(jidDecode(msg.key.remoteJid!).user, 'c.us')
|
||||
await sendReceipt(jid, undefined, [msg.key.id], 'hist_sync')
|
||||
}
|
||||
|
||||
return newEvents
|
||||
}
|
||||
|
||||
const handleEncryptNotification = async(node: BinaryNode) => {
|
||||
const from = node.attrs.from
|
||||
if(from === S_WHATSAPP_NET) {
|
||||
@@ -223,7 +167,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
}
|
||||
}
|
||||
|
||||
const processNotification = (node: BinaryNode) => {
|
||||
const processNotification = async(node: BinaryNode) => {
|
||||
const result: Partial<proto.IWebMessageInfo> = { }
|
||||
const [child] = getAllBinaryNodeChildren(node)
|
||||
const nodeType = node.attrs.type
|
||||
@@ -294,13 +238,19 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
const event = decodeMediaRetryNode(node)
|
||||
ev.emit('messages.media-update', [event])
|
||||
} else if(nodeType === 'encrypt') {
|
||||
handleEncryptNotification(node)
|
||||
await handleEncryptNotification(node)
|
||||
} else if(nodeType === 'devices') {
|
||||
const devices = getBinaryNodeChildren(child, 'device')
|
||||
if(areJidsSameUser(child.attrs.jid, authState.creds!.me!.id)) {
|
||||
const deviceJids = devices.map(d => d.attrs.jid)
|
||||
logger.info({ deviceJids }, 'got my own devices')
|
||||
}
|
||||
} else if(nodeType === 'server_sync') {
|
||||
const update = getBinaryNodeChild(node, 'collection')
|
||||
if(update) {
|
||||
const name = update.attrs.name as WAPatchName
|
||||
await mutationMutex.mutex(() => resyncAppState([name], undefined))
|
||||
}
|
||||
}
|
||||
|
||||
if(Object.keys(result).length) {
|
||||
@@ -372,191 +322,156 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
participant: attrs.participant
|
||||
}
|
||||
|
||||
await processingMutex.mutex(
|
||||
remoteJid,
|
||||
async() => {
|
||||
const status = getStatusFromReceiptType(attrs.type)
|
||||
if(
|
||||
typeof status !== 'undefined' &&
|
||||
(
|
||||
// basically, we only want to know when a message from us has been delivered to/read by the other person
|
||||
// or another device of ours has read some messages
|
||||
status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ||
|
||||
!isNodeFromMe
|
||||
)
|
||||
) {
|
||||
if(isJidGroup(remoteJid)) {
|
||||
if(attrs.participant) {
|
||||
const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp'
|
||||
await Promise.all([
|
||||
processingMutex.mutex(
|
||||
async() => {
|
||||
const status = getStatusFromReceiptType(attrs.type)
|
||||
if(
|
||||
typeof status !== 'undefined' &&
|
||||
(
|
||||
// basically, we only want to know when a message from us has been delivered to/read by the other person
|
||||
// or another device of ours has read some messages
|
||||
status > proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ||
|
||||
!isNodeFromMe
|
||||
)
|
||||
) {
|
||||
if(isJidGroup(remoteJid)) {
|
||||
if(attrs.participant) {
|
||||
const updateKey: keyof MessageUserReceipt = status === proto.WebMessageInfo.WebMessageInfoStatus.DELIVERY_ACK ? 'receiptTimestamp' : 'readTimestamp'
|
||||
ev.emit(
|
||||
'message-receipt.update',
|
||||
ids.map(id => ({
|
||||
key: { ...key, id },
|
||||
receipt: {
|
||||
userJid: jidNormalizedUser(attrs.participant),
|
||||
[updateKey]: +attrs.t
|
||||
}
|
||||
}))
|
||||
)
|
||||
}
|
||||
} else {
|
||||
ev.emit(
|
||||
'message-receipt.update',
|
||||
'messages.update',
|
||||
ids.map(id => ({
|
||||
key: { ...key, id },
|
||||
receipt: {
|
||||
userJid: jidNormalizedUser(attrs.participant),
|
||||
[updateKey]: +attrs.t
|
||||
}
|
||||
update: { status }
|
||||
}))
|
||||
)
|
||||
}
|
||||
} else {
|
||||
ev.emit(
|
||||
'messages.update',
|
||||
ids.map(id => ({
|
||||
key: { ...key, id },
|
||||
update: { status }
|
||||
}))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
await sendMessageAck(node)
|
||||
|
||||
if(attrs.type === 'retry') {
|
||||
// correctly set who is asking for the retry
|
||||
key.participant = key.participant || attrs.from
|
||||
if(willSendMessageAgain(ids[0], key.participant)) {
|
||||
if(key.fromMe) {
|
||||
try {
|
||||
logger.debug({ attrs, key }, 'recv retry request')
|
||||
await sendMessagesAgain(key, ids)
|
||||
} catch(error) {
|
||||
logger.error({ key, ids, trace: error.stack }, 'error in sending message again')
|
||||
if(attrs.type === 'retry') {
|
||||
// correctly set who is asking for the retry
|
||||
key.participant = key.participant || attrs.from
|
||||
if(willSendMessageAgain(ids[0], key.participant)) {
|
||||
if(key.fromMe) {
|
||||
try {
|
||||
logger.debug({ attrs, key }, 'recv retry request')
|
||||
await sendMessagesAgain(key, ids)
|
||||
} catch(error) {
|
||||
logger.error({ key, ids, trace: error.stack }, 'error in sending message again')
|
||||
}
|
||||
} else {
|
||||
logger.info({ attrs, key }, 'recv retry for not fromMe message')
|
||||
}
|
||||
} else {
|
||||
logger.info({ attrs, key }, 'recv retry for not fromMe message')
|
||||
logger.info({ attrs, key }, 'will not send message again, as sent too many times')
|
||||
}
|
||||
} else {
|
||||
logger.info({ attrs, key }, 'will not send message again, as sent too many times')
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
sendMessageAck(node)
|
||||
])
|
||||
}
|
||||
|
||||
const handleNotification = async(node: BinaryNode) => {
|
||||
const remoteJid = node.attrs.from
|
||||
await sendMessageAck(node)
|
||||
const msg = processNotification(node)
|
||||
if(msg) {
|
||||
const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id)
|
||||
msg.key = {
|
||||
remoteJid,
|
||||
fromMe,
|
||||
participant: node.attrs.participant,
|
||||
id: node.attrs.id,
|
||||
...(msg.key || {})
|
||||
}
|
||||
msg.participant = node.attrs.participant
|
||||
msg.messageTimestamp = +node.attrs.t
|
||||
await Promise.all([
|
||||
processingMutex.mutex(
|
||||
async() => {
|
||||
const msg = await processNotification(node)
|
||||
if(msg) {
|
||||
const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id)
|
||||
msg.key = {
|
||||
remoteJid,
|
||||
fromMe,
|
||||
participant: node.attrs.participant,
|
||||
id: node.attrs.id,
|
||||
...(msg.key || {})
|
||||
}
|
||||
msg.participant = node.attrs.participant
|
||||
msg.messageTimestamp = +node.attrs.t
|
||||
|
||||
const fullMsg = proto.WebMessageInfo.fromObject(msg)
|
||||
ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' })
|
||||
}
|
||||
}
|
||||
|
||||
const handleUpsertedMessages = async({ messages, type }: BaileysEventMap<any>['messages.upsert']) => {
|
||||
if(type === 'notify' || type === 'append') {
|
||||
const contactNameUpdates: { [_: string]: string } = { }
|
||||
for(const msg of messages) {
|
||||
const normalizedChatId = jidNormalizedUser(msg.key.remoteJid)
|
||||
if(!!msg.pushName) {
|
||||
let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid)
|
||||
jid = jidNormalizedUser(jid)
|
||||
|
||||
contactNameUpdates[jid] = msg.pushName
|
||||
// update our pushname too
|
||||
if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) {
|
||||
ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } })
|
||||
const fullMsg = proto.WebMessageInfo.fromObject(msg)
|
||||
await upsertMessage(fullMsg, 'append')
|
||||
}
|
||||
}
|
||||
|
||||
const events = await processingMutex.mutex(
|
||||
'p-' + normalizedChatId,
|
||||
() => processMessageLocal(msg)
|
||||
)
|
||||
emitEventsFromMap(events)
|
||||
}
|
||||
|
||||
if(Object.keys(contactNameUpdates).length) {
|
||||
ev.emit('contacts.update', Object.keys(contactNameUpdates).map(
|
||||
id => ({ id, notify: contactNameUpdates[id] })
|
||||
))
|
||||
}
|
||||
}
|
||||
),
|
||||
sendMessageAck(node)
|
||||
])
|
||||
}
|
||||
|
||||
const handleBadAck = async({ attrs }: BinaryNode) => {
|
||||
// current hypothesis is that if pash is sent in the ack
|
||||
// it means -- the message hasn't reached all devices yet
|
||||
// we'll retry sending the message here
|
||||
if(attrs.phash) {
|
||||
logger.info({ attrs }, 'received phash in ack, resending message...')
|
||||
const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id }
|
||||
const msg = await getMessage(key)
|
||||
if(msg) {
|
||||
await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false })
|
||||
} else {
|
||||
logger.warn({ attrs }, 'could not send message again, as it was not found')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recv a message
|
||||
ws.on('CB:message', (stanza: BinaryNode) => {
|
||||
const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(stanza, authState)
|
||||
processingMutex.mutex(
|
||||
msg.key.remoteJid!,
|
||||
async() => {
|
||||
await sendMessageAck(stanza)
|
||||
await decryptionTask
|
||||
// message failed to decrypt
|
||||
if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) {
|
||||
logger.error(
|
||||
{ key: msg.key, params: msg.messageStubParameters },
|
||||
'failure in decrypting message'
|
||||
)
|
||||
retryMutex.mutex(
|
||||
async() => {
|
||||
if(ws.readyState === ws.OPEN) {
|
||||
await sendRetryRequest(stanza)
|
||||
if(retryRequestDelayMs) {
|
||||
await delay(retryRequestDelayMs)
|
||||
const handleMessage = async(node: BinaryNode) => {
|
||||
const { fullMessage: msg, category, author, decryptionTask } = decodeMessageStanza(node, authState)
|
||||
await Promise.all([
|
||||
processingMutex.mutex(
|
||||
async() => {
|
||||
await decryptionTask
|
||||
// message failed to decrypt
|
||||
if(msg.messageStubType === proto.WebMessageInfo.WebMessageInfoStubType.CIPHERTEXT) {
|
||||
logger.error(
|
||||
{ key: msg.key, params: msg.messageStubParameters },
|
||||
'failure in decrypting message'
|
||||
)
|
||||
retryMutex.mutex(
|
||||
async() => {
|
||||
if(ws.readyState === ws.OPEN) {
|
||||
await sendRetryRequest(node)
|
||||
if(retryRequestDelayMs) {
|
||||
await delay(retryRequestDelayMs)
|
||||
}
|
||||
} else {
|
||||
logger.debug({ node }, 'connection closed, ignoring retry req')
|
||||
}
|
||||
} else {
|
||||
logger.debug({ stanza }, 'connection closed, ignoring retry req')
|
||||
}
|
||||
)
|
||||
} else {
|
||||
// no type in the receipt => message delivered
|
||||
let type: MessageReceiptType = undefined
|
||||
let participant = msg.key.participant
|
||||
if(category === 'peer') { // special peer message
|
||||
type = 'peer_msg'
|
||||
} else if(msg.key.fromMe) { // message was sent by us from a different device
|
||||
type = 'sender'
|
||||
// need to specially handle this case
|
||||
if(isJidUser(msg.key.remoteJid)) {
|
||||
participant = author
|
||||
}
|
||||
} else if(!sendActiveReceipts) {
|
||||
type = 'inactive'
|
||||
}
|
||||
)
|
||||
} else {
|
||||
// no type in the receipt => message delivered
|
||||
let type: MessageReceiptType = undefined
|
||||
let participant = msg.key.participant
|
||||
if(category === 'peer') { // special peer message
|
||||
type = 'peer_msg'
|
||||
} else if(msg.key.fromMe) { // message was sent by us from a different device
|
||||
type = 'sender'
|
||||
// need to specially handle this case
|
||||
if(isJidUser(msg.key.remoteJid)) {
|
||||
participant = author
|
||||
|
||||
await sendReceipt(msg.key.remoteJid!, participant, [msg.key.id!], type)
|
||||
|
||||
|
||||
// send ack for history message
|
||||
const isAnyHistoryMsg = isHistoryMsg(msg.message)
|
||||
if(isAnyHistoryMsg) {
|
||||
const jid = jidEncode(jidDecode(msg.key.remoteJid!).user, 'c.us')
|
||||
await sendReceipt(jid, undefined, [msg.key.id], 'hist_sync')
|
||||
}
|
||||
} else if(!sendActiveReceipts) {
|
||||
type = 'inactive'
|
||||
}
|
||||
|
||||
await sendReceipt(msg.key.remoteJid!, participant, [msg.key.id!], type)
|
||||
cleanMessage(msg, authState.creds.me!.id)
|
||||
}
|
||||
),
|
||||
sendMessageAck(node)
|
||||
])
|
||||
|
||||
cleanMessage(msg, authState.creds.me!.id)
|
||||
ev.emit('messages.upsert', { messages: [msg], type: stanza.attrs.offline ? 'append' : 'notify' })
|
||||
}
|
||||
)
|
||||
.catch(
|
||||
error => onUnexpectedError(error, 'processing message')
|
||||
)
|
||||
})
|
||||
await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify')
|
||||
}
|
||||
|
||||
ws.on('CB:call', async(node: BinaryNode) => {
|
||||
const handleCall = async(node: BinaryNode) => {
|
||||
const { attrs } = node
|
||||
const [infoChild] = getAllBinaryNodeChildren(node)
|
||||
const callId = infoChild.attrs['call-id']
|
||||
@@ -591,25 +506,62 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
ev.emit('call', [call])
|
||||
|
||||
await sendMessageAck(node, { type: infoChild.tag })
|
||||
.catch(
|
||||
error => onUnexpectedError(error, 'ack call')
|
||||
)
|
||||
}
|
||||
|
||||
const handleBadAck = async({ attrs }: BinaryNode) => {
|
||||
// current hypothesis is that if pash is sent in the ack
|
||||
// it means -- the message hasn't reached all devices yet
|
||||
// we'll retry sending the message here
|
||||
if(attrs.phash) {
|
||||
logger.info({ attrs }, 'received phash in ack, resending message...')
|
||||
const key: WAMessageKey = { remoteJid: attrs.from, fromMe: true, id: attrs.id }
|
||||
const msg = await getMessage(key)
|
||||
if(msg) {
|
||||
await relayMessage(key.remoteJid, msg, { messageId: key.id, useUserDevicesCache: false })
|
||||
} else {
|
||||
logger.warn({ attrs }, 'could not send message again, as it was not found')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const flushBufferIfLastOfflineNode = (
|
||||
node: BinaryNode,
|
||||
identifier: string,
|
||||
exec: (node: BinaryNode) => Promise<any>
|
||||
) => {
|
||||
const task = exec(node)
|
||||
.catch(err => onUnexpectedError(err, identifier))
|
||||
const offline = node.attrs.offline
|
||||
if(offline) {
|
||||
ev.processInBuffer(task)
|
||||
}
|
||||
}
|
||||
|
||||
// called when all offline notifs are handled
|
||||
ws.on('CB:ib,,offline', (node: BinaryNode) => {
|
||||
const child = getBinaryNodeChild(node, 'offline')
|
||||
const offlineNotifs = +child.attrs.count
|
||||
|
||||
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
|
||||
ev.emit('connection.update', { receivedPendingNotifications: true })
|
||||
ev.flush()
|
||||
})
|
||||
|
||||
// recv a message
|
||||
ws.on('CB:message', (node: BinaryNode) => {
|
||||
flushBufferIfLastOfflineNode(node, 'processing message', handleMessage)
|
||||
})
|
||||
|
||||
ws.on('CB:call', async(node: BinaryNode) => {
|
||||
flushBufferIfLastOfflineNode(node, 'handling call', handleCall)
|
||||
})
|
||||
|
||||
ws.on('CB:receipt', node => {
|
||||
handleReceipt(node)
|
||||
.catch(
|
||||
error => onUnexpectedError(error, 'handling receipt')
|
||||
)
|
||||
flushBufferIfLastOfflineNode(node, 'handling receipt', handleReceipt)
|
||||
})
|
||||
|
||||
ws.on('CB:notification', async(node: BinaryNode) => {
|
||||
handleNotification(node)
|
||||
.catch(
|
||||
error => {
|
||||
onUnexpectedError(error, 'handling notification')
|
||||
}
|
||||
)
|
||||
flushBufferIfLastOfflineNode(node, 'handling notification', handleNotification)
|
||||
})
|
||||
|
||||
ws.on('CB:ack,class:message', (node: BinaryNode) => {
|
||||
@@ -617,13 +569,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
.catch(error => onUnexpectedError(error, 'handling bad ack'))
|
||||
})
|
||||
|
||||
ev.on('messages.upsert', data => {
|
||||
handleUpsertedMessages(data)
|
||||
.catch(
|
||||
error => onUnexpectedError(error, 'handling upserted messages')
|
||||
)
|
||||
})
|
||||
|
||||
ev.on('call', ([ call ]) => {
|
||||
// missed call + group call notification message generation
|
||||
if(call.status === 'timeout' || (call.status === 'offer' && call.isGroup)) {
|
||||
@@ -662,7 +607,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
|
||||
|
||||
return {
|
||||
...sock,
|
||||
processMessage: processMessageLocal,
|
||||
sendMessageAck,
|
||||
sendRetryRequest
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import { WA_DEFAULT_EPHEMERAL } from '../Defaults'
|
||||
import { AnyMessageContent, MediaConnInfo, MessageReceiptType, MessageRelayOptions, MiscMessageGenerationOptions, SocketConfig, WAMessageKey } from '../Types'
|
||||
import { aggregateMessageKeysNotFromMe, assertMediaContent, bindWaitForEvent, decryptMediaRetryData, encodeWAMessage, encryptMediaRetryRequest, encryptSenderKeyMsgSignalProto, encryptSignalProto, extractDeviceJids, generateMessageID, generateWAMessage, getUrlFromDirectPath, getWAUploadToServer, jidToSignalProtocolAddress, parseAndInjectE2ESessions, unixTimestampSeconds } from '../Utils'
|
||||
import { getUrlInfo } from '../Utils/link-preview'
|
||||
import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, JidWithDevice, reduceBinaryNodeToDictionary, S_WHATSAPP_NET } from '../WABinary'
|
||||
import { areJidsSameUser, BinaryNode, BinaryNodeAttributes, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidEncode, jidNormalizedUser, JidWithDevice, S_WHATSAPP_NET } from '../WABinary'
|
||||
import { makeGroupsSocket } from './groups'
|
||||
|
||||
export const makeMessagesSocket = (config: SocketConfig) => {
|
||||
@@ -15,7 +15,9 @@ export const makeMessagesSocket = (config: SocketConfig) => {
|
||||
const {
|
||||
ev,
|
||||
authState,
|
||||
upsertMessage,
|
||||
query,
|
||||
fetchPrivacySettings,
|
||||
generateMessageTag,
|
||||
sendNode,
|
||||
groupMetadata,
|
||||
@@ -26,26 +28,6 @@ export const makeMessagesSocket = (config: SocketConfig) => {
|
||||
stdTTL: 300, // 5 minutes
|
||||
useClones: false
|
||||
})
|
||||
let privacySettings: { [_: string]: string } | undefined
|
||||
|
||||
const fetchPrivacySettings = async(force: boolean = false) => {
|
||||
if(!privacySettings || force) {
|
||||
const { content } = await query({
|
||||
tag: 'iq',
|
||||
attrs: {
|
||||
xmlns: 'privacy',
|
||||
to: S_WHATSAPP_NET,
|
||||
type: 'get'
|
||||
},
|
||||
content: [
|
||||
{ tag: 'privacy', attrs: { } }
|
||||
]
|
||||
})
|
||||
privacySettings = reduceBinaryNodeToDictionary(content[0] as BinaryNode, 'category')
|
||||
}
|
||||
|
||||
return privacySettings
|
||||
}
|
||||
|
||||
let mediaConn: Promise<MediaConnInfo>
|
||||
const refreshMediaConn = async(forceGet = false) => {
|
||||
@@ -633,7 +615,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
|
||||
await relayMessage(jid, fullMsg.message, { messageId: fullMsg.key.id!, cachedGroupMetadata: options.cachedGroupMetadata, additionalAttributes })
|
||||
if(config.emitOwnEvents) {
|
||||
process.nextTick(() => {
|
||||
ev.emit('messages.upsert', { messages: [fullMsg], type: 'append' })
|
||||
upsertMessage(fullMsg, 'append')
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import { proto } from '../../WAProto'
|
||||
import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, INITIAL_PREKEY_COUNT, MIN_PREKEY_COUNT } from '../Defaults'
|
||||
import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types'
|
||||
import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, generateLoginNode, generateMdTagPrefix, generateRegistrationNode, getCodeFromWSError, getErrorCodeFromStreamError, getNextPreKeysNode, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout } from '../Utils'
|
||||
import { makeEventBuffer } from '../Utils/event-buffer'
|
||||
import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, getBinaryNodeChildren, S_WHATSAPP_NET } from '../WABinary'
|
||||
|
||||
/**
|
||||
@@ -34,7 +35,8 @@ export const makeSocket = ({
|
||||
agent
|
||||
})
|
||||
ws.setMaxListeners(0)
|
||||
const ev = new EventEmitter() as BaileysEventEmitter
|
||||
const _ev = new EventEmitter() as BaileysEventEmitter
|
||||
const ev = makeEventBuffer(_ev, logger)
|
||||
/** ephemeral key pair used to encrypt/decrypt communication. Unique for each connection */
|
||||
const ephemeralKeyPair = Curve.generateKeyPair()
|
||||
/** WA noise protocol wrapper */
|
||||
@@ -501,15 +503,6 @@ export const makeSocket = ({
|
||||
ev.emit('connection.update', { connection: 'open' })
|
||||
})
|
||||
|
||||
ws.on('CB:ib,,offline', (node: BinaryNode) => {
|
||||
const child = getBinaryNodeChild(node, 'offline')
|
||||
const offlineCount = +child.attrs.count
|
||||
|
||||
logger.info(`got ${offlineCount} offline messages/notifications`)
|
||||
|
||||
ev.emit('connection.update', { receivedPendingNotifications: true })
|
||||
})
|
||||
|
||||
ws.on('CB:stream:error', (node: BinaryNode) => {
|
||||
logger.error({ node }, 'stream errored out')
|
||||
|
||||
@@ -528,6 +521,8 @@ export const makeSocket = ({
|
||||
})
|
||||
|
||||
process.nextTick(() => {
|
||||
// start buffering important events
|
||||
ev.buffer()
|
||||
ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined })
|
||||
})
|
||||
// update credentials when required
|
||||
@@ -572,7 +567,7 @@ export const makeSocket = ({
|
||||
onUnexpectedError,
|
||||
uploadPreKeys,
|
||||
/** Waits for the connection to WA to reach a state */
|
||||
waitForConnectionUpdate: bindWaitForConnectionUpdate(ev)
|
||||
waitForConnectionUpdate: bindWaitForConnectionUpdate(ev),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user