From fb66733b61ddb1a0c851d621de59ee251770eb47 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Wed, 19 Jan 2022 21:35:59 +0530 Subject: [PATCH] feat: implement in memory store 1. the store works as a temporary store for connection data such as chats, messages & contacts 2. the store is primarily meant to illustrate the usage of the event emitter as a way to construct the state of the connection. This will likely be very inefficient to perform well at scale 3. the store is meant to be a quick way to have some visibility of data while testing 4. the store works for both legacy & MD connections --- .gitignore | 1 + Example/example-legacy.ts | 12 +- Example/example.ts | 13 +- package.json | 4 +- src/Store/index.ts | 2 + src/Store/make-in-memory-store.ts | 374 +++++++++++++++++++++++++++ src/Store/make-ordered-dictionary.ts | 86 ++++++ src/index.ts | 2 +- yarn.lock | 5 + 9 files changed, 495 insertions(+), 4 deletions(-) create mode 100644 src/Store/index.ts create mode 100644 src/Store/make-in-memory-store.ts diff --git a/.gitignore b/.gitignore index ee4cfa5..b96baf0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ node_modules auth_info*.json +baileys_store*.json output.csv */.DS_Store .DS_Store diff --git a/Example/example-legacy.ts b/Example/example-legacy.ts index 76bb044..dc4c8fe 100644 --- a/Example/example-legacy.ts +++ b/Example/example-legacy.ts @@ -1,6 +1,15 @@ import P from "pino" import { Boom } from "@hapi/boom" -import { makeWALegacySocket, DisconnectReason, AnyMessageContent, delay, useSingleFileLegacyAuthState } from '../src' +import { makeWALegacySocket, DisconnectReason, AnyMessageContent, delay, useSingleFileLegacyAuthState, makeInMemoryStore } from '../src' + +// the store maintains the data of the WA connection in memory +// can be written out to a file & read from it +const store = makeInMemoryStore({ logger: P().child({ level: 'debug', stream: 'store' }) }) +store.readFromFile('./baileys_store.json') + +setInterval(() => { + store.writeToFile('./baileys_store.json') +}, 10_000) const { state, saveState } = useSingleFileLegacyAuthState('./auth_info.json') @@ -12,6 +21,7 @@ const startSock = () => { printQRInTerminal: true, auth: state }) + store.bind(sock.ev) const sendMessageWTyping = async(msg: AnyMessageContent, jid: string) => { await sock.presenceSubscribe(jid) diff --git a/Example/example.ts b/Example/example.ts index 7dcc2be..ab69d13 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -1,6 +1,15 @@ import P from "pino" import { Boom } from "@hapi/boom" -import makeWASocket, { DisconnectReason, AnyMessageContent, delay, useSingleFileAuthState } from '../src' +import makeWASocket, { DisconnectReason, AnyMessageContent, delay, useSingleFileAuthState, makeInMemoryStore } from '../src' + +// the store maintains the data of the WA connection in memory +// can be written out to a file & read from it +const store = makeInMemoryStore({ logger: P().child({ level: 'debug', stream: 'store' }) }) +store.readFromFile('./baileys_store.json') + +setInterval(() => { + store.writeToFile('./baileys_store.json') +}, 10_000) const { state, saveState } = useSingleFileAuthState('./auth_info_multi.json') @@ -19,6 +28,8 @@ const startSock = () => { } }) + store.bind(sock.ev) + const sendMessageWTyping = async(msg: AnyMessageContent, jid: string) => { await sock.presenceSubscribe(jid) await delay(500) diff --git a/package.json b/package.json index 50d112b..c7622c2 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,8 @@ "peerDependencies": { "jimp": "^0.16.1", "qrcode-terminal": "^0.12.0", - "sharp": "^0.29.3" + "sharp": "^0.29.3", + "@adiwajshing/keyed-db": "^0.2.4" }, "files": [ "lib/*", @@ -58,6 +59,7 @@ ], "devDependencies": { "@adiwajshing/eslint-config": "git+https://github.com/adiwajshing/eslint-config", + "@adiwajshing/keyed-db": "^0.2.4", "@types/got": "^9.6.11", "@types/jest": "^26.0.24", "@types/node": "^14.6.2", diff --git a/src/Store/index.ts b/src/Store/index.ts new file mode 100644 index 0000000..f744671 --- /dev/null +++ b/src/Store/index.ts @@ -0,0 +1,2 @@ +import makeInMemoryStore from './make-in-memory-store' +export { makeInMemoryStore } \ No newline at end of file diff --git a/src/Store/make-in-memory-store.ts b/src/Store/make-in-memory-store.ts new file mode 100644 index 0000000..5a3be5e --- /dev/null +++ b/src/Store/make-in-memory-store.ts @@ -0,0 +1,374 @@ +import type KeyedDB from '@adiwajshing/keyed-db' +import type { Comparable } from '@adiwajshing/keyed-db/lib/Types' +import type { Logger } from 'pino' +import { proto } from '../../WAProto' +import { DEFAULT_CONNECTION_CONFIG } from '../Defaults' +import type makeLegacySocket from '../LegacySocket' +import type makeMDSocket from '../Socket' +import type { BaileysEventEmitter, Chat, ConnectionState, Contact, GroupMetadata, MessageInfo, PresenceData, WAMessage, WAMessageCursor, WAMessageKey } from '../Types' +import { toNumber } from '../Utils' +import { jidNormalizedUser } from '../WABinary' +import makeOrderedDictionary from './make-ordered-dictionary' + +type LegacyWASocket = ReturnType +type AnyWASocket = ReturnType + +export const waChatKey = (pin: boolean) => ({ + key: (c: Chat) => (pin ? (c.pin ? '1' : '0') : '') + (c.archive ? '0' : '1') + c.conversationTimestamp.toString(16).padStart(8, '0') + c.id, + compare: (k1: string, k2: string) => k2.localeCompare (k1) +}) + +export const waMessageID = (m: WAMessage) => m.key.id + +export type BaileysInMemoryStoreConfig = { + chatKey?: Comparable + logger?: Logger +} + +const makeMessagesDictionary = () => makeOrderedDictionary(waMessageID) + +export default ( + { logger, chatKey }: BaileysInMemoryStoreConfig +) => { + logger = logger || DEFAULT_CONNECTION_CONFIG.logger.child({ stream: 'in-mem-store' }) + chatKey = chatKey || waChatKey(true) + const KeyedDB = require('@adiwajshing/keyed-db').default as new (...args: any[]) => KeyedDB + + const chats = new KeyedDB(chatKey, c => c.id) + const messages: { [_: string]: ReturnType } = { } + const contacts: { [_: string]: Contact } = { } + const groupMetadata: { [_: string]: GroupMetadata } = { } + const messageInfos: { [id: string]: MessageInfo } = { } + const presences: { [id: string]: { [participant: string]: PresenceData } } = { } + const state: ConnectionState = { connection: 'close' } + + const assertMessageList = (jid: string) => { + if(!messages[jid]) { + messages[jid] = makeMessagesDictionary() + } + + return messages[jid] + } + + const contactsUpsert = (newContacts: Contact[]) => { + const oldContacts = new Set(Object.keys(contacts)) + for(const contact of newContacts) { + oldContacts.delete(contact.id) + contacts[contact.id] = Object.assign( + contacts[contact.id] || {}, + contact + ) + } + + return oldContacts + } + + /** + * binds to a BaileysEventEmitter. + * It listens to all events and constructs a state that you can query accurate data from. + * Eg. can use the store to fetch chats, contacts, messages etc. + * @param ev typically the event emitter from the socket connection + */ + const bind = (ev: BaileysEventEmitter) => { + ev.on('connection.update', update => { + Object.assign(state, update) + }) + ev.on('chats.set', ({ chats: newChats, isLatest }) => { + if(isLatest) { + chats.clear() + } + + const chatsAdded = chats.insertIfAbsent(...newChats).length + logger.debug({ chatsAdded }, 'synced chats') + }) + ev.on('contacts.set', ({ contacts: newContacts }) => { + const oldContacts = contactsUpsert(newContacts) + for(const jid of oldContacts) { + delete contacts[jid] + } + + logger.debug({ deletedContacts: oldContacts.size, newContacts }, 'synced contacts') + }) + ev.on('messages.set', ({ messages: newMessages, isLatest }) => { + if(isLatest) { + for(const id in messages) { + delete messages[id] + } + } + + for(const msg of newMessages) { + const jid = msg.key.remoteJid! + const list = assertMessageList(jid) + list.upsert(msg, 'prepend') + } + + logger.debug({ messages: newMessages.length }, 'synced messages') + }) + ev.on('contacts.update', updates => { + for(const update of updates) { + if(contacts[update.id!]) { + Object.assign(contacts[update.id!], update) + } else { + logger.debug({ update }, 'got update for non-existant contact') + } + } + }) + ev.on('chats.upsert', newChats => { + chats.upsert(...newChats) + }) + ev.on('chats.update', updates => { + for(const update of updates) { + const result = chats.update(update.id!, chat => { + if(update.unreadCount > 0) { + update.unreadCount = chat.unreadCount + update.unreadCount + } + + Object.assign(chat, update) + }) + if(!result) { + logger.debug({ update }, 'got update for non-existant chat') + } + } + }) + ev.on('presence.update', ({ id, presences: update }) => { + presences[id] = presences[id] || {} + Object.assign(presences[id], update) + }) + ev.on('chats.delete', deletions => { + for(const item of deletions) { + chats.deleteById(item) + } + }) + ev.on('messages.upsert', ({ messages: newMessages, type }) => { + switch (type) { + case 'append': + case 'notify': + for(const msg of newMessages) { + const jid = jidNormalizedUser(msg.key.remoteJid!) + const list = assertMessageList(jid) + list.upsert(msg, 'append') + + if(type === 'notify') { + if(!chats.get(jid)) { + ev.emit('chats.upsert', [ + { + id: jid, + conversationTimestamp: toNumber(msg.messageTimestamp), + unreadCount: 1 + } + ]) + } + + // add message infos if required + messageInfos[msg.key.id!] = messageInfos[msg.key.id!] || { reads: {}, deliveries: {} } + } + } + + break + } + }) + ev.on('messages.update', updates => { + for(const { update, key } of updates) { + const list = assertMessageList(key.remoteJid) + const result = list.updateAssign(key.id, update) + if(!result) { + logger.debug({ update }, 'got update for non-existent message') + } + } + }) + ev.on('messages.delete', item => { + if('all' in item) { + const list = messages[item.jid] + list?.clear() + } else { + const jid = item.keys[0].remoteJid + const list = messages[jid] + if(list) { + const idSet = new Set(item.keys.map(k => k.id)) + list.filter(m => !idSet.has(m.key.id)) + } + } + }) + + ev.on('groups.update', updates => { + for(const update of updates) { + if(groupMetadata[update.id]) { + Object.assign(groupMetadata[update.id!], update) + } else { + logger.debug({ update }, 'got update for non-existant group metadata') + } + } + }) + + ev.on('group-participants.update', ({ id, participants, action }) => { + const metadata = groupMetadata[id] + if(metadata) { + switch (action) { + case 'add': + metadata.participants.push(...participants.map(id => ({ id, isAdmin: false, isSuperAdmin: false }))) + break + case 'demote': + case 'promote': + for(const participant of metadata.participants) { + if(participants.includes(participant.id)) { + participant.isAdmin = action === 'promote' + } + } + + break + case 'remove': + metadata.participants = metadata.participants.filter(p => !participants.includes(p.id)) + break + } + } + }) + + ev.on('message-info.update', updates => { + for(const { key, update } of updates) { + const obj = messageInfos[key.id!] + if(obj) { + // add reads/deliveries + for(const key in update) { + Object.assign(obj[key], update[key]) + } + } + } + }) + } + + const toJSON = () => ({ + chats, + contacts, + messages + }) + + const fromJSON = (json: { chats: Chat[], contacts: { [id: string]: Contact }, messages: { [id: string]: WAMessage[] } }) => { + chats.upsert(...json.chats) + contactsUpsert(Object.values(contacts)) + for(const jid in json.messages) { + const list = assertMessageList(jid) + for(const msg of json.messages[jid]) { + list.upsert(proto.WebMessageInfo.fromObject(msg), 'append') + } + } + } + + + return { + chats, + contacts, + messages, + groupMetadata, + messageInfos, + state, + presences, + bind, + loadMessages: async(jid: string, count: number, cursor: WAMessageCursor, sock: LegacyWASocket | undefined) => { + const list = assertMessageList(jid) + const retrieve = async(count: number, cursor: WAMessageCursor) => { + const result = await sock?.fetchMessagesFromWA(jid, count, cursor) + return result || [] + } + + const mode = !cursor || 'before' in cursor ? 'before' : 'after' + const cursorKey = !!cursor ? ('before' in cursor ? cursor.before : cursor.after) : undefined + const cursorValue = cursorKey ? list.get(cursorKey.id) : undefined + + let messages: WAMessage[] + if(list && mode === 'before' && (!cursorKey || cursorValue)) { + if(cursorValue) { + const msgIdx = list.array.findIndex(m => m.key.id === cursorKey.id) + messages = list.array.slice(0, msgIdx) + } else { + messages = list.array + } + + const diff = count - messages.length + if(diff < 0) { + messages = messages.slice(-count) // get the last X messages + } else if(diff > 0) { + const [fMessage] = messages + const cursor = { before: fMessage?.key || cursorKey } + const extra = await retrieve (diff, cursor) + // add to DB + for(let i = extra.length-1; i >= 0;i--) { + list.upsert(extra[i], 'prepend') + } + + messages.splice(0, 0, ...extra) + } + } else { + messages = await retrieve(count, cursor) + } + + return messages + }, + loadMessage: async(jid: string, id: string, sock: LegacyWASocket | undefined) => { + let message = messages[jid]?.get(id) + if(!message) { + message = await sock?.loadMessageFromWA(jid, id) + } + + return message + }, + mostRecentMessage: async(jid: string, sock: LegacyWASocket | undefined) => { + let message = messages[jid]?.array.slice(-1)[0] + if(!message) { + const [result] = await sock?.fetchMessagesFromWA(jid, 1, undefined) + message = result + } + + return message + }, + fetchImageUrl: async(jid: string, sock: AnyWASocket | undefined) => { + const contact = contacts[jid] + if(!contact) { + return sock?.profilePictureUrl(jid) + } + + if(typeof contact.imgUrl === 'undefined') { + contact.imgUrl = await sock?.profilePictureUrl(jid) + } + + return contact.imgUrl + }, + fetchGroupMetadata: async(jid: string, sock: AnyWASocket | undefined) => { + if(!groupMetadata[jid]) { + groupMetadata[jid] = await sock?.groupMetadata(jid) + } + + return groupMetadata[jid] + }, + fetchBroadcastListInfo: async(jid: string, sock: LegacyWASocket | undefined) => { + if(!groupMetadata[jid]) { + groupMetadata[jid] = await sock?.getBroadcastListInfo(jid) + } + + return groupMetadata[jid] + }, + fetchMessageInfo: async({ remoteJid, id }: WAMessageKey, sock: LegacyWASocket | undefined) => { + if(!messageInfos[id!]) { + messageInfos[id!] = await sock?.messageInfo(remoteJid, id) + } + + return messageInfos[id!] + }, + toJSON, + fromJSON, + writeToFile: (path: string) => { + // require fs here so that in case "fs" is not available -- the app does not crash + const { writeFileSync } = require('fs') + writeFileSync(path, JSON.stringify(toJSON())) + }, + readFromFile: (path: string) => { + // require fs here so that in case "fs" is not available -- the app does not crash + const { readFileSync, existsSync } = require('fs') + if(existsSync(path)) { + logger.debug({ path }, 'reading from file') + const jsonStr = readFileSync(path, { encoding: 'utf-8' }) + const json = JSON.parse(jsonStr) + fromJSON(json) + } + } + } +} \ No newline at end of file diff --git a/src/Store/make-ordered-dictionary.ts b/src/Store/make-ordered-dictionary.ts index e69de29..b66d760 100644 --- a/src/Store/make-ordered-dictionary.ts +++ b/src/Store/make-ordered-dictionary.ts @@ -0,0 +1,86 @@ +function makeOrderedDictionary(idGetter: (item: T) => string) { + const array: T[] = [] + const dict: { [_: string]: T } = { } + + const get = (id: string) => dict[id] + + const update = (item: T) => { + const id = idGetter(item) + const idx = array.findIndex(i => idGetter(i) === id) + if(idx >= 0) { + array[idx] = item + dict[id] = item + } + + return false + } + + const upsert = (item: T, mode: 'append' | 'prepend') => { + const id = idGetter(item) + if(get(id)) { + update(item) + } else { + if(mode === 'append') { + array.push(item) + } else { + array.splice(0, 0, item) + } + + dict[id] = item + } + } + + const remove = (item: T) => { + const id = idGetter(item) + const idx = array.findIndex(i => idGetter(i) === id) + if(idx >= 0) { + array.splice(idx, 1) + delete dict[id] + return true + } + + return false + } + + return { + array, + get, + upsert, + update, + remove, + updateAssign: (id: string, update: Partial) => { + const item = get(id) + if(item) { + Object.assign(item, update) + delete dict[id] + dict[idGetter(item)] = item + return true + } + + return false + }, + clear: () => { + array.splice(0, array.length) + Object.keys(dict).forEach(key => { + delete dict[key] + }) + }, + filter: (contain: (item: T) => boolean) => { + let i = 0 + while(i < array.length) { + if(!contain(array[i])) { + delete dict[idGetter(array[i])] + array.splice(i, 1) + } else { + i += 1 + } + } + }, + toJSON: () => array, + fromJSON: (newItems: T[]) => { + array.splice(0, array.length, ...newItems) + } + } +} + +export default makeOrderedDictionary \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index d52f14b..4fcdeb2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,7 @@ import makeWASocket from './Socket' export * from '../WAProto' export * from './Utils' export * from './Types' -//export * from './Store' +export * from './Store' export * from './Defaults' export * from './WABinary' diff --git a/yarn.lock b/yarn.lock index 950ff72..54b0e73 100644 --- a/yarn.lock +++ b/yarn.lock @@ -12,6 +12,11 @@ eslint-plugin-simple-import-sort "^7.0.0" eslint-plugin-unused-imports "^1.1.5" +"@adiwajshing/keyed-db@^0.2.4": + version "0.2.4" + resolved "https://registry.yarnpkg.com/@adiwajshing/keyed-db/-/keyed-db-0.2.4.tgz#2a09e88fce20b2672deb60a7750c5fe3ab0dfd99" + integrity sha512-yprSnAtj80/VKuDqRcFFLDYltoNV8tChNwFfIgcf6PGD4sjzWIBgs08pRuTqGH5mk5wgL6PBRSsMCZqtZwzFEw== + "@babel/code-frame@7.12.11": version "7.12.11" resolved "https://registry.yarnpkg.com/@babel/code-frame/-/code-frame-7.12.11.tgz#f4ad435aa263db935b8f10f2c552d23fb716a63f"