feat: add bulk "process" capability to BaileysBufferableEventEmitter

This commit is contained in:
Adhiraj Singh
2022-07-04 11:34:41 +05:30
parent de95694266
commit 5cc58d4aed
7 changed files with 190 additions and 128 deletions

View File

@@ -22,12 +22,10 @@ export const makeChatsSocket = (config: SocketConfig) => {
sendNode,
query,
onUnexpectedError,
emitEventsFromMap,
} = sock
let privacySettings: { [_: string]: string } | undefined
const mutationMutex = makeMutex()
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
const processingMutex = makeMutex()
/** cache to ensure new history sync events do not have duplicate items */
@@ -527,7 +525,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
logger.debug('resyncing main app state')
await (
mutationMutex.mutex(
processingMutex.mutex(
() => resyncAppState(ALL_WA_PATCH_NAMES, ctx)
)
.catch(err => (
@@ -546,7 +544,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
let initial: LTHashState
let encodeResult: { patch: proto.ISyncdPatch, state: LTHashState }
await mutationMutex.mutex(
await processingMutex.mutex(
async() => {
await authState.keys.transaction(
async() => {
@@ -694,12 +692,30 @@ export const makeChatsSocket = (config: SocketConfig) => {
])
}
const processMessageLocal = async(msg: proto.IWebMessageInfo) => {
const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => {
const startedBuffer = ev.buffer()
ev.emit('messages.upsert', { messages: [msg], type })
if(!!msg.pushName) {
let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid)
jid = jidNormalizedUser(jid)
if(!msg.key.fromMe) {
ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }])
}
// update our pushname too
if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) {
ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } })
}
}
// process message and emit events
const newEvents = await processMessage(
await processMessage(
msg,
{
downloadHistory,
ev,
historyCache,
recvChats,
creds: authState.creds,
@@ -717,28 +733,9 @@ export const makeChatsSocket = (config: SocketConfig) => {
appStateSyncTimeout.start()
}
return newEvents
}
const upsertMessage = async(msg: WAMessage, type: MessageUpsertType) => {
ev.emit('messages.upsert', { messages: [msg], type })
if(!!msg.pushName) {
let jid = msg.key.fromMe ? authState.creds.me!.id : (msg.key.participant || msg.key.remoteJid)
jid = jidNormalizedUser(jid)
if(!msg.key.fromMe) {
ev.emit('contacts.update', [{ id: jid, notify: msg.pushName, verifiedName: msg.verifiedBizName }])
}
// update our pushname too
if(msg.key.fromMe && authState.creds.me?.name !== msg.pushName) {
ev.emit('creds.update', { me: { ...authState.creds.me!, name: msg.pushName! } })
}
if(startedBuffer) {
await ev.flush()
}
const events = await processMessageLocal(msg)
emitEventsFromMap(events)
}
ws.on('CB:presence', handlePresenceUpdate)
@@ -777,7 +774,6 @@ export const makeChatsSocket = (config: SocketConfig) => {
return {
...sock,
mutationMutex,
processingMutex,
fetchPrivacySettings,
upsertMessage,

View File

@@ -155,6 +155,8 @@ export const makeGroupsSocket = (config: SocketConfig) => {
admin: key.remoteJid!
}
}])
const started = ev.buffer()
// if we have the full message key
// update the invite message to be expired
if(key.id) {
@@ -193,6 +195,10 @@ export const makeGroupsSocket = (config: SocketConfig) => {
'notify'
)
if(started) {
await ev.flush()
}
return results.attrs.from
},
groupGetInviteInfo: async(code: string) => {

View File

@@ -20,7 +20,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
ev,
authState,
ws,
mutationMutex,
processingMutex,
upsertMessage,
resyncAppState,
@@ -249,7 +248,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const update = getBinaryNodeChild(node, 'collection')
if(update) {
const name = update.attrs.name as WAPatchName
await mutationMutex.mutex(() => resyncAppState([name], undefined))
await resyncAppState([name], undefined)
}
}
@@ -591,10 +590,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
const protoMsg = proto.WebMessageInfo.fromObject(msg)
ev.emit(
'messages.upsert',
{ messages: [protoMsg], type: call.offline ? 'append' : 'notify' }
)
upsertMessage(protoMsg, call.offline ? 'append' : 'notify')
}
})

View File

@@ -1,10 +1,9 @@
import { Boom } from '@hapi/boom'
import EventEmitter from 'events'
import { promisify } from 'util'
import WebSocket from 'ws'
import { proto } from '../../WAProto'
import { DEF_CALLBACK_PREFIX, DEF_TAG_PREFIX, DEFAULT_ORIGIN, INITIAL_PREKEY_COUNT, MIN_PREKEY_COUNT } from '../Defaults'
import { AuthenticationCreds, BaileysEventEmitter, BaileysEventMap, DisconnectReason, SocketConfig } from '../Types'
import { DisconnectReason, SocketConfig } from '../Types'
import { addTransactionCapability, bindWaitForConnectionUpdate, configureSuccessfulPairing, Curve, generateLoginNode, generateMdTagPrefix, generateRegistrationNode, getCodeFromWSError, getErrorCodeFromStreamError, getNextPreKeysNode, makeNoiseHandler, printQRIfNecessaryListener, promiseTimeout } from '../Utils'
import { makeEventBuffer } from '../Utils/event-buffer'
import { assertNodeErrorFree, BinaryNode, encodeBinaryNode, getBinaryNodeChild, getBinaryNodeChildren, S_WHATSAPP_NET } from '../WABinary'
@@ -35,8 +34,7 @@ export const makeSocket = ({
agent
})
ws.setMaxListeners(0)
const _ev = new EventEmitter() as BaileysEventEmitter
const ev = makeEventBuffer(_ev, logger)
const ev = makeEventBuffer(logger)
/** ephemeral key pair used to encrypt/decrypt communication. Unique for each connection */
const ephemeralKeyPair = Curve.generateKeyPair()
/** WA noise protocol wrapper */
@@ -386,12 +384,6 @@ export const makeSocket = ({
})
)
const emitEventsFromMap = (map: Partial<BaileysEventMap<AuthenticationCreds>>) => {
for(const key in map) {
ev.emit(key as any, map[key])
}
}
/** logout & invalidate connection */
const logout = async() => {
const jid = authState.creds.me?.id
@@ -555,7 +547,6 @@ export const makeSocket = ({
get user() {
return authState.creds.me
},
emitEventsFromMap,
generateMessageTag,
query,
waitForMessage,