This commit is contained in:
Alan Mosko
2023-01-17 11:29:10 -03:00
parent c398aa3ca4
commit 05dd53e2ce
12 changed files with 217 additions and 106 deletions

View File

@@ -285,7 +285,7 @@ async function findAppModules() {
const indentation = moduleIndentationMap[info.type]?.indentation
let typeName = unnestName(info.type)
if(indentation !== parentName && indentation) {
typeName = `${indentation.replace(/\$/g, '.')}.${typeName}`
typeName = `${indentation.replaceAll('$', '.')}.${typeName}`
}
// if(info.enumValues) {

View File

@@ -1,3 +1,3 @@
{
"version": [2, 2244, 6]
"version": [2, 2243, 7]
}

View File

@@ -1,6 +1,6 @@
import { ProductCreate, ProductUpdate, SocketConfig } from '../Types'
import { GetCatalogOptions, ProductCreate, ProductUpdate, SocketConfig } from '../Types'
import { parseCatalogNode, parseCollectionsNode, parseOrderDetailsNode, parseProductNode, toProductNode, uploadingNecessaryImagesOfProduct } from '../Utils/business'
import { jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary'
import { BinaryNode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary'
import { getBinaryNodeChild } from '../WABinary/generic-utils'
import { makeMessagesRecvSocket } from './messages-recv'
@@ -12,9 +12,36 @@ export const makeBusinessSocket = (config: SocketConfig) => {
waUploadToServer
} = sock
const getCatalog = async(jid?: string, limit = 10) => {
const getCatalog = async({ jid, limit, cursor }: GetCatalogOptions) => {
jid = jid || authState.creds.me?.id
jid = jidNormalizedUser(jid!)
const queryParamNodes: BinaryNode[] = [
{
tag: 'limit',
attrs: { },
content: Buffer.from((limit || 10).toString())
},
{
tag: 'width',
attrs: { },
content: Buffer.from('100')
},
{
tag: 'height',
attrs: { },
content: Buffer.from('100')
},
]
if(cursor) {
queryParamNodes.push({
tag: 'after',
attrs: { },
content: cursor
})
}
const result = await query({
tag: 'iq',
attrs: {
@@ -29,23 +56,7 @@ export const makeBusinessSocket = (config: SocketConfig) => {
jid,
allow_shop_source: 'true'
},
content: [
{
tag: 'limit',
attrs: { },
content: Buffer.from(limit.toString())
},
{
tag: 'width',
attrs: { },
content: Buffer.from('100')
},
{
tag: 'height',
attrs: { },
content: Buffer.from('100')
}
]
content: queryParamNodes
}
]
})

View File

@@ -494,14 +494,27 @@ export const makeChatsSocket = (config: SocketConfig) => {
}
}
const presenceSubscribe = (toJid: string) => (
/**
* @param toJid the jid to subscribe to
* @param tcToken token for subscription, use if present
*/
const presenceSubscribe = (toJid: string, tcToken?: Buffer) => (
sendNode({
tag: 'presence',
attrs: {
to: toJid,
id: generateMessageTag(),
type: 'subscribe'
}
},
content: tcToken
? [
{
tag: 'tctoken',
attrs: { },
content: tcToken
}
]
: undefined
})
)
@@ -725,7 +738,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
)
: false
if(shouldProcessHistoryMsg && !authState.creds.myAppStateKeyId) {
if(historyMsg && !authState.creds.myAppStateKeyId) {
logger.warn('skipping app state sync, as myAppStateKeyId is not set')
pendingAppStateSync = true
}
@@ -733,7 +746,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
await Promise.all([
(async() => {
if(
shouldProcessHistoryMsg
historyMsg
&& authState.creds.myAppStateKeyId
) {
pendingAppStateSync = false
@@ -822,7 +835,8 @@ export const makeChatsSocket = (config: SocketConfig) => {
// we keep buffering events until we finally have
// the key and can sync the messages
if(!authState.creds?.myAppStateKeyId) {
needToFlushWithAppStateSync = ev.buffer()
ev.buffer()
needToFlushWithAppStateSync = true
}
}
})

View File

@@ -268,6 +268,21 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const from = jidNormalizedUser(node.attrs.from)
switch (nodeType) {
case 'privacy_token':
const tokenList = getBinaryNodeChildren(child, 'token')
for(const { attrs, content } of tokenList) {
const jid = attrs.jid
ev.emit('chats.update', [
{
id: jid,
tcToken: content as Buffer
}
])
logger.debug({ jid }, 'got privacy token update')
}
break
case 'w:gp2':
handleGroupNotification(node.attrs.participant, child, result)
break
@@ -645,16 +660,9 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
identifier: string,
exec: (node: BinaryNode) => Promise<any>
) => {
const started = ev.buffer()
if(started) {
await execTask()
if(started) {
await ev.flush()
}
} else {
const task = execTask()
ev.processInBuffer(task)
}
ev.buffer()
await execTask()
ev.flush()
function execTask() {
return exec(node)
@@ -662,17 +670,6 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}
// called when all offline notifs are handled
ws.on('CB:ib,,offline', async(node: BinaryNode) => {
const child = getBinaryNodeChild(node, 'offline')
const offlineNotifs = +(child?.attrs.count || 0)
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
await ev.flush()
ev.emit('connection.update', { receivedPendingNotifications: true })
})
// recv a message
ws.on('CB:message', (node: BinaryNode) => {
processNodeWithBuffer(node, 'processing message', handleMessage)

View File

@@ -532,11 +532,32 @@ export const makeSocket = ({
end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch }))
})
let didStartBuffer = false
process.nextTick(() => {
// start buffering important events
ev.buffer()
if(creds.me?.id) {
// start buffering important events
// if we're logged in
ev.buffer()
didStartBuffer = true
}
ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined })
})
// called when all offline notifs are handled
ws.on('CB:ib,,offline', (node: BinaryNode) => {
const child = getBinaryNodeChild(node, 'offline')
const offlineNotifs = +(child?.attrs.count || 0)
logger.info(`handled ${offlineNotifs} offline messages/notifications`)
if(didStartBuffer) {
ev.flush()
logger.trace('flushed events for initial buffer')
}
ev.emit('connection.update', { receivedPendingNotifications: true })
})
// update credentials when required
ev.on('creds.update', update => {
const name = update.me?.name

View File

@@ -22,16 +22,25 @@ describe('Event Buffer Tests', () => {
ev.on('chats.update', () => fail('should not emit update event'))
ev.buffer()
ev.processInBuffer((async() => {
await delay(100)
ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }])
})())
ev.processInBuffer((async() => {
await delay(200)
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }])
})())
await Promise.all([
(async() => {
ev.buffer()
await delay(100)
ev.emit('chats.upsert', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }])
const flushed = ev.flush()
expect(flushed).toBeFalsy()
})(),
(async() => {
ev.buffer()
await delay(200)
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }])
const flushed = ev.flush()
expect(flushed).toBeFalsy()
})()
])
await ev.flush()
const flushed = ev.flush()
expect(flushed).toBeTruthy()
expect(chats).toHaveLength(1)
expect(chats[0].conversationTimestamp).toEqual(124)
@@ -51,7 +60,7 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.delete', [chatId])
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 124, unreadCount: 1 }])
await ev.flush()
ev.flush()
expect(chats).toHaveLength(1)
})
@@ -68,7 +77,7 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.update', [{ id: chatId, conversationTimestamp: 123, unreadCount: 1 }])
ev.emit('chats.delete', [chatId])
await ev.flush()
ev.flush()
expect(chatsDeleted).toHaveLength(1)
})
@@ -103,7 +112,7 @@ describe('Event Buffer Tests', () => {
}
}])
await ev.flush()
ev.flush()
ev.buffer()
ev.emit('chats.upsert', [{
@@ -123,7 +132,7 @@ describe('Event Buffer Tests', () => {
messages: [],
isLatest: false
})
await ev.flush()
ev.flush()
expect(chatsUpserted).toHaveLength(1)
expect(chatsUpserted[0].id).toEqual(chatId)
@@ -159,7 +168,7 @@ describe('Event Buffer Tests', () => {
muteEndTime: 123
}])
await ev.flush()
ev.flush()
expect(chatsUpserted).toHaveLength(1)
expect(chatsUpserted[0].archived).toBeUndefined()
@@ -184,7 +193,7 @@ describe('Event Buffer Tests', () => {
})
ev.emit('chats.update', [{ id: chatId, archived: true }])
await ev.flush()
ev.flush()
expect(chatRecv).toBeDefined()
expect(chatRecv?.archived).toBeTruthy()
@@ -218,7 +227,7 @@ describe('Event Buffer Tests', () => {
ev.emit('messages.upsert', { messages: [proto.WebMessageInfo.fromObject(msg)], type: 'notify' })
ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }])
await ev.flush()
ev.flush()
expect(msgs).toHaveLength(1)
expect(msgs[0].message).toBeTruthy()
@@ -254,7 +263,7 @@ describe('Event Buffer Tests', () => {
}
])
await ev.flush()
ev.flush()
expect(msgs).toHaveLength(1)
expect(msgs[0].userReceipt).toHaveLength(1)
@@ -275,7 +284,7 @@ describe('Event Buffer Tests', () => {
ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.DELIVERY_ACK } }])
ev.emit('messages.update', [{ key, update: { status: WAMessageStatus.READ } }])
await ev.flush()
ev.flush()
expect(msgs).toHaveLength(1)
expect(msgs[0].update.status).toEqual(WAMessageStatus.READ)
@@ -303,7 +312,7 @@ describe('Event Buffer Tests', () => {
ev.emit('chats.update', [{ id: msg.key.remoteJid!, unreadCount: 1, conversationTimestamp: msg.messageTimestamp }])
ev.emit('messages.update', [{ key: msg.key, update: { status: WAMessageStatus.READ } }])
await ev.flush()
ev.flush()
expect(chats[0].unreadCount).toBeUndefined()
})

View File

@@ -0,0 +1,37 @@
import { WAMessageContent } from '../Types'
import { normalizeMessageContent } from '../Utils'
describe('Messages Tests', () => {
it('should correctly unwrap messages', () => {
const CONTENT = { imageMessage: { } }
expectRightContent(CONTENT)
expectRightContent({
ephemeralMessage: { message: CONTENT }
})
expectRightContent({
viewOnceMessage: {
message: {
ephemeralMessage: { message: CONTENT }
}
}
})
expectRightContent({
viewOnceMessage: {
message: {
viewOnceMessageV2: {
message: {
ephemeralMessage: { message: CONTENT }
}
}
}
}
})
function expectRightContent(content: WAMessageContent) {
expect(
normalizeMessageContent(content)
).toHaveProperty('imageMessage')
}
})
})

View File

@@ -70,4 +70,15 @@ export type OrderProduct = {
export type OrderDetails = {
price: OrderPrice
products: OrderProduct[]
}
export type CatalogCursor = string
export type GetCatalogOptions = {
/** cursor to start from */
cursor?: CatalogCursor
/** number of products to fetch */
limit?: number
jid?: string
}

View File

@@ -7,7 +7,14 @@ import { getStream, getUrlFromDirectPath, toReadable } from './messages-media'
export const parseCatalogNode = (node: BinaryNode) => {
const catalogNode = getBinaryNodeChild(node, 'product_catalog')
const products = getBinaryNodeChildren(catalogNode, 'product').map(parseProductNode)
return { products }
const paging = getBinaryNodeChild(catalogNode, 'paging')
return {
products,
nextPageCursor: paging
? getBinaryNodeChildString(paging, 'after')
: undefined
}
}
export const parseCollectionsNode = (node: BinaryNode) => {

View File

@@ -39,15 +39,16 @@ type BaileysBufferableEventEmitter = BaileysEventEmitter & {
process(handler: (events: BaileysEventData) => void | Promise<void>): (() => void)
/**
* starts buffering events, call flush() to release them
* @returns true if buffering just started, false if it was already buffering
* */
buffer(): boolean
buffer(): void
/** buffers all events till the promise completes */
createBufferedFunction<A extends any[], T>(work: (...args: A) => Promise<T>): ((...args: A) => Promise<T>)
/** flushes all buffered events */
flush(): Promise<void>
/** waits for the task to complete, before releasing the buffer */
processInBuffer(task: Promise<any>)
/**
* flushes all buffered events
* @param force if true, will flush all data regardless of any pending buffers
* @returns returns true if the flush actually happened, otherwise false
*/
flush(force?: boolean): boolean
/** is there an ongoing buffer */
isBuffering(): boolean
}
@@ -62,9 +63,7 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
const historyCache = new Set<string>()
let data = makeBufferData()
let isBuffering = false
let preBufferTask: Promise<any> = Promise.resolve()
let preBufferTraces: string[] = []
let buffersInProgress = 0
// take the generic event and fire it as a baileys event
ev.on('event', (map: BaileysEventData) => {
@@ -74,25 +73,24 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
})
function buffer() {
if(!isBuffering) {
logger.trace('buffering events')
isBuffering = true
return true
}
return false
buffersInProgress += 1
}
async function flush() {
if(!isBuffering) {
return
function flush(force = false) {
// no buffer going on
if(!buffersInProgress) {
return false
}
logger.trace({ preBufferTraces }, 'releasing buffered events...')
await preBufferTask
preBufferTraces = []
isBuffering = false
if(!force) {
// reduce the number of buffers in progress
buffersInProgress -= 1
// if there are still some buffers going on
// then we don't flush now
if(buffersInProgress) {
return false
}
}
const newData = makeBufferData()
const chatUpdates = Object.values(data.chatUpdates)
@@ -117,6 +115,8 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
{ conditionalChatUpdatesLeft },
'released buffered events'
)
return true
}
return {
@@ -131,34 +131,26 @@ export const makeEventBuffer = (logger: Logger): BaileysBufferableEventEmitter =
}
},
emit<T extends BaileysEvent>(event: BaileysEvent, evData: BaileysEventMap[T]) {
if(isBuffering && BUFFERABLE_EVENT_SET.has(event)) {
if(buffersInProgress && BUFFERABLE_EVENT_SET.has(event)) {
append(data, historyCache, event as any, evData, logger)
return true
}
return ev.emit('event', { [event]: evData })
},
processInBuffer(task) {
if(isBuffering) {
preBufferTask = Promise.allSettled([ preBufferTask, task ])
preBufferTraces.push(new Error('').stack!)
}
},
isBuffering() {
return isBuffering
return buffersInProgress > 0
},
buffer,
flush,
createBufferedFunction(work) {
return async(...args) => {
const started = buffer()
buffer()
try {
const result = await work(...args)
return result
} finally {
if(started) {
await flush()
}
flush()
}
}
},

View File

@@ -1,8 +1,20 @@
import logger from './logger'
const MUTEX_TIMEOUT_MS = 60_000
export const makeMutex = () => {
let task = Promise.resolve() as Promise<any>
let taskTimeout: NodeJS.Timeout | undefined
return {
mutex<T>(code: () => Promise<T> | T): Promise<T> {
task = (async() => {
const stack = new Error('mutex start').stack
let waitOver = false
taskTimeout = setTimeout(() => {
logger.warn({ stack, waitOver }, 'possible mutex deadlock')
}, MUTEX_TIMEOUT_MS)
// wait for the previous task to complete
// if there is an error, we swallow so as to not block the queue
try {