compute chat deltas with connect

This commit is contained in:
Adhiraj
2020-09-03 18:25:43 +05:30
parent 33448690a4
commit 3d5b37fc44
10 changed files with 178 additions and 111 deletions

View File

@@ -47,33 +47,21 @@ describe('Test Connect', () => {
const conn = new WAConnection()
conn.connectOptions.timeoutMs = 20*1000
await conn
.loadAuthInfo (auth)
.connect ()
.then (conn => {
assert.ok(conn.user)
assert.ok(conn.user.jid)
await conn.loadAuthInfo (auth).connect ()
assert.ok(conn.user)
assert.ok(conn.user.jid)
const chatArray = conn.chats.all()
if (chatArray.length > 0) {
assert.ok(chatArray[0].jid)
assert.ok(chatArray[0].count !== null)
if (chatArray[0].messages.length > 0) {
assert.ok(chatArray[0].messages[0])
}
}
})
.then (() => conn.logout())
.then (() => conn.loadAuthInfo(auth))
.then (() => (
conn.connect()
.then (() => assert.fail('should not have reconnected'))
.catch (err => {
assert.ok (err instanceof BaileysError)
assert.ok ((err as BaileysError).status >= 400)
})
))
.finally (() => conn.close())
assertChatDBIntegrity (conn)
await conn.logout()
conn.loadAuthInfo(auth)
await conn.connect()
.then (() => assert.fail('should not have reconnected'))
.catch (err => {
assert.ok (err instanceof BaileysError)
assert.ok ((err as BaileysError).status >= 400)
})
conn.close()
})
it ('should disconnect & reconnect phone', async () => {
const conn = new WAConnection ()
@@ -237,6 +225,31 @@ describe ('Reconnects', () => {
})
describe ('Pending Requests', () => {
it ('should correctly send updates', async () => {
const conn = new WAConnection ()
conn.connectOptions.timeoutMs = 20*1000
conn.pendingRequestTimeoutMs = null
conn.loadAuthInfo('./auth_info.json')
await conn.connect ()
conn.close ()
const oldChat = conn.chats.all()[0]
oldChat.archive = 'true' // mark the first chat as archived
oldChat.modify_tag = '1234' // change modify tag to detect change
const result = await conn.connect ()
assert.ok (!result.newConnection)
const chat = result.updatedChats[oldChat.jid]
assert.ok (chat)
assert.ok ('archive' in chat)
assert.equal (Object.keys(chat).length, 2)
conn.close ()
})
it('should queue requests when closed', async () => {
const conn = new WAConnection ()
conn.pendingRequestTimeoutMs = null

View File

@@ -33,7 +33,7 @@ export class WAConnection extends EventEmitter {
user: WAUser
/** What level of messages to log to the console */
logLevel: MessageLogLevel = MessageLogLevel.info
/** Should requests be queued when the connection breaks in between; if false, then an error will be thrown */
/** Should requests be queued when the connection breaks in between; if 0, then an error will be thrown */
pendingRequestTimeoutMs: number = null
/** The connection state */
state: WAConnectionState = 'close'
@@ -43,7 +43,7 @@ export class WAConnection extends EventEmitter {
timeoutMs: 60*1000,
waitForChats: true,
maxRetries: 5,
connectCooldownMs: 5000
connectCooldownMs: 2250
}
/** When to auto-reconnect */
autoReconnect = ReconnectMode.onConnectionLost
@@ -71,7 +71,7 @@ export class WAConnection extends EventEmitter {
protected lastSeen: Date = null // last keep alive received
protected qrTimeout: NodeJS.Timeout
protected lastConnectTime: Date = null
protected lastDisconnectTime: Date = null
protected lastDisconnectReason: DisconnectReason
constructor () {
@@ -85,7 +85,7 @@ export class WAConnection extends EventEmitter {
* @param options the connect options
*/
async connect() {
return this
return null
}
async unexpectedDisconnect (error: DisconnectReason) {
const willReconnect =
@@ -313,6 +313,7 @@ export class WAConnection extends EventEmitter {
this.msgCount = 0
this.phoneConnected = false
this.lastDisconnectReason = reason
this.lastDisconnectTime = new Date ()
this.endConnection ()

View File

@@ -1,31 +1,36 @@
import * as Utils from './Utils'
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError } from './Constants'
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError, WAOpenResult } from './Constants'
import {WAConnection as Base} from './1.Validation'
import Decoder from '../Binary/Decoder'
export class WAConnection extends Base {
/** Connect to WhatsApp Web */
async connect() {
async connect () {
// if we're already connected, throw an error
if (this.state !== 'close') throw new Error('cannot connect when state=' + this.state)
const options = this.connectOptions
const newConnection = !this.authInfo
this.state = 'connecting'
this.emit ('connecting')
let tries = 0
let lastConnect = this.lastDisconnectTime
var updates
while (this.state === 'connecting') {
tries += 1
try {
const diff = this.lastConnectTime ? new Date().getTime()-this.lastConnectTime.getTime() : Infinity
await this.connectInternal (
const diff = lastConnect ? new Date().getTime()-lastConnect.getTime() : Infinity
updates = await this.connectInternal (
options,
diff > this.connectOptions.connectCooldownMs ? 0 : this.connectOptions.connectCooldownMs
)
this.phoneConnected = true
this.state = 'open'
} catch (error) {
lastConnect = new Date()
const loggedOut = error instanceof BaileysError && UNAUTHORIZED_CODES.includes(error.status)
const willReconnect = !loggedOut && (tries <= (options?.maxRetries || 5)) && this.state === 'connecting'
@@ -36,12 +41,12 @@ export class WAConnection extends Base {
}
if (!willReconnect) throw error
} finally {
this.lastConnectTime = new Date()
}
}
this.emit ('open')
const updatedChats = !!this.lastDisconnectTime && updates
const result: WAOpenResult = { newConnection, updatedChats }
this.emit ('open', result)
this.releasePendingRequests ()
this.startKeepAliveRequest()
@@ -50,8 +55,7 @@ export class WAConnection extends Base {
this.conn.on ('close', () => this.unexpectedDisconnect (DisconnectReason.close))
return this
return result
}
/** Meat of the connect logic */
protected async connectInternal (options: WAConnectOptions, delayMs?: number) {
@@ -61,7 +65,7 @@ export class WAConnection extends Base {
const { ws, cancel } = Utils.openWebSocketConnection (5000, false)
let task = ws
let task: Promise<void | { [k: string]: Partial<WAChat> }> = ws
.then (conn => this.conn = conn)
.then (() => (
this.conn.on('message', data => this.onMessageRecieved(data as any))
@@ -78,16 +82,17 @@ export class WAConnection extends Base {
let cancelTask: () => void
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
const {waitForChats, cancelChats} = this.receiveChatsAndContacts(true)
const {waitForChats, cancelChats} = this.receiveChatsAndContacts()
task = Promise.all ([task, waitForChats]).then (() => {})
task = Promise.all ([task, waitForChats]).then (([_, updates]) => updates)
cancelTask = () => { cancelChats(); cancel() }
} else cancelTask = cancel
// determine whether reconnect should be used or not
const shouldUseReconnect = this.lastDisconnectReason !== DisconnectReason.replaced &&
this.lastDisconnectReason !== DisconnectReason.unknown &&
this.lastDisconnectReason !== DisconnectReason.intentional && this.user?.jid
this.lastDisconnectReason !== DisconnectReason.intentional &&
this.user?.jid
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => (
@@ -96,7 +101,7 @@ export class WAConnection extends Base {
.catch (err => {
this.endConnection ()
throw err
}) as Promise<void>
}) as Promise<void | { [k: string]: Partial<WAChat> }>
return { promise, cancel: cancelTask }
}
@@ -114,23 +119,27 @@ export class WAConnection extends Base {
cancellations.push (cancel)
}
return promise
.then (() => {
const {promise, cancel} = connect ()
cancellations.push (cancel)
return promise
})
.finally (() => {
cancel()
this.off('close', cancel)
})
try {
await promise
const result = connect ()
cancellations.push (result.cancel)
const final = await result.promise
return final
} finally {
cancel()
this.off('close', cancel)
}
}
/**
* Sets up callbacks to receive chats, contacts & messages.
* Must be called immediately after connect
* @returns [chats, contacts]
*/
protected receiveChatsAndContacts(stopAfterMostRecentMessage: boolean=false) {
protected receiveChatsAndContacts() {
const oldChats: {[k: string]: WAChat} = this.chats['dict']
this.contacts = {}
this.chats.clear ()
@@ -141,21 +150,19 @@ export class WAConnection extends Base {
const deregisterCallbacks = () => {
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.deregisterCallback(['action', 'add:last'])
if (!stopAfterMostRecentMessage) {
this.deregisterCallback(['action', 'add:before'])
this.deregisterCallback(['action', 'add:unread'])
}
this.deregisterCallback(['action', 'add:before'])
this.deregisterCallback(['action', 'add:unread'])
this.deregisterCallback(['response', 'type:chat'])
this.deregisterCallback(['response', 'type:contacts'])
}
const checkForResolution = () => {
if (receivedContacts && receivedMessages) resolveTask ()
}
const checkForResolution = () => receivedContacts && receivedMessages && resolveTask ()
// wait for messages to load
const chatUpdate = json => {
receivedMessages = true
const isLast = json[1].last || stopAfterMostRecentMessage
const isLast = json[1].last
const messages = json[2] as WANode[]
if (messages) {
@@ -171,10 +178,9 @@ export class WAConnection extends Base {
// wait for actual messages to load, "last" is the most recent message, "before" contains prior messages
this.registerCallback(['action', 'add:last'], chatUpdate)
if (!stopAfterMostRecentMessage) {
this.registerCallback(['action', 'add:before'], chatUpdate)
this.registerCallback(['action', 'add:unread'], chatUpdate)
}
this.registerCallback(['action', 'add:before'], chatUpdate)
this.registerCallback(['action', 'add:unread'], chatUpdate)
// get chats
this.registerCallback(['response', 'type:chat'], json => {
if (json[1].duplicate || !json[2]) return
@@ -185,6 +191,7 @@ export class WAConnection extends Base {
this.log (`unexpectedly got null chat: ${item}, ${chat}`, MessageLogLevel.info)
return
}
chat.jid = Utils.whatsappID (chat.jid)
chat.t = +chat.t
chat.count = +chat.count
@@ -220,21 +227,33 @@ export class WAConnection extends Base {
// wait for the chats & contacts to load
let cancelChats: () => void
const waitForChats = new Promise ((resolve, reject) => {
resolveTask = resolve
cancelChats = () => reject (CancelledError())
})
.then (() => (
this.chats
.all ()
.forEach (chat => {
const waitForChats = async () => {
try {
await new Promise ((resolve, reject) => {
resolveTask = resolve
cancelChats = () => reject (CancelledError())
})
const updatedChats: { [k: string]: Partial<WAChat> } = {}
for (let chat of this.chats.all()) {
const respectiveContact = this.contacts[chat.jid]
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
})
))
.finally (deregisterCallbacks)
if (!oldChats[chat.jid]) {
updatedChats[chat.jid] = chat
} else if (oldChats[chat.jid].t < chat.t || oldChats[chat.jid].modify_tag !== chat.modify_tag) {
const changes = Utils.shallowChanges (oldChats[chat.jid], chat)
delete changes.messages
return { waitForChats, cancelChats }
updatedChats[chat.jid] = changes
}
}
return updatedChats
} finally {
deregisterCallbacks ()
}
}
return { waitForChats: waitForChats (), cancelChats }
}
private releasePendingRequests () {
this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request

View File

@@ -1,6 +1,6 @@
import * as QR from 'qrcode-terminal'
import { WAConnection as Base } from './3.Connect'
import { WAMessageStatusUpdate, WAMessage, WAContact, WAChat, WAMessageProto, WA_MESSAGE_STUB_TYPE, WA_MESSAGE_STATUS_TYPE, MessageLogLevel, PresenceUpdate, BaileysEvent, DisconnectReason, WANode } from './Constants'
import { WAMessageStatusUpdate, WAMessage, WAContact, WAChat, WAMessageProto, WA_MESSAGE_STUB_TYPE, WA_MESSAGE_STATUS_TYPE, MessageLogLevel, PresenceUpdate, BaileysEvent, DisconnectReason, WANode, WAOpenResult } from './Constants'
import { whatsappID, unixTimestampSeconds, isGroupID, toNumber } from './Utils'
export class WAConnection extends Base {
@@ -143,18 +143,7 @@ export class WAConnection extends Base {
}
this.registerCallback('Msg', func)
this.registerCallback('MsgInfo', func)
/*// genetic chat action
this.registerCallback (['Chat', 'cmd:action'], json => {
const data = json[1].data as WANode
if (!data) return
this.log (data, MessageLogLevel.info)
if (data[0] === 'create') {
}
})*/
this.on ('qr', qr => QR.generate(qr, { small: true }))
}
/** Get the URL to download the profile picture of a person/group */
@@ -300,7 +289,7 @@ export class WAConnection extends Base {
// Add all event types
/** when the connection has opened successfully */
on (event: 'open', listener: () => void): this
on (event: 'open', listener: (result: WAOpenResult) => void): this
/** when the connection is opening */
on (event: 'connecting', listener: () => void): this
/** when the connection has closed */

View File

@@ -7,7 +7,7 @@ import {
WAUrlInfo,
WAMessageContent, WAMetric, WAFlag, WANode, WAMessage, WAMessageProto, BaileysError, MessageLogLevel, WA_MESSAGE_STATUS_TYPE
} from './Constants'
import { whatsappID } from './Utils'
import { whatsappID, delay } from './Utils'
export class WAConnection extends Base {
@@ -173,6 +173,27 @@ export class WAConnection extends Base {
}
return loadMessage() as Promise<void>
}
/**
* Find a message in a given conversation
* @param chunkSize the number of messages to load in a single request
* @param onMessage callback for every message retreived, if return true -- the loop will break
*/
async findMessage (jid: string, chunkSize: number, onMessage: (m: WAMessage) => boolean) {
const chat = this.chats.get (whatsappID(jid))
let count = chat?.messages?.length || chunkSize
let offsetID
while (true) {
const {messages, cursor} = await this.loadMessages(jid, count, offsetID, true)
// callback with most recent message first (descending order of date)
for (let i = messages.length - 1; i >= 0; i--) {
if (onMessage(messages[i])) return
}
if (messages.length === 0) return
// if there are more messages
offsetID = cursor
await delay (200)
}
}
/** Load a single message specified by the ID */
async loadMessage (jid: string, messageID: string) {
// load the message before the given message

View File

@@ -219,7 +219,6 @@ export enum WAFlag {
}
/** Tag used with binary queries */
export type WATag = [WAMetric, WAFlag]
/** set of statuses visible to other people; see updatePresence() in WhatsAppWeb.Send */
export enum Presence {
available = 'available', // "online"
@@ -310,6 +309,15 @@ export interface WAMessageStatusUpdate {
/** Status of the Message IDs */
type: WA_MESSAGE_STATUS_TYPE
}
export interface WAOpenResult {
/** Was this connection opened via a QR scan */
newConnection: boolean
updatedChats?: {
[k: string]: Partial<WAChat>
}
}
export enum GroupSettingChange {
messageSend = 'announcement',
settingsChange = 'locked',

View File

@@ -33,6 +33,21 @@ export const waChatUniqueKey = (c: WAChat) => ((c.t*100000) + (hashCode(c.jid)%1
export const whatsappID = (jid: string) => jid?.replace ('@c.us', '@s.whatsapp.net')
export const isGroupID = (jid: string) => jid?.includes ('@g.us')
export function shallowChanges <T> (old: T, current: T): Partial<T> {
let changes: Partial<T> = {}
for (let key in current) {
if (old[key] !== current[key]) {
changes[key] = current[key] || null
}
}
for (let key in old) {
if (!changes[key] && old[key] !== current[key]) {
changes[key] = current[key] || null
}
}
return changes
}
/** decrypt AES 256 CBC; where the IV is prefixed to the buffer */
export function aesDecrypt(buffer: Buffer, key: Buffer) {
return aesDecryptWithIV(buffer.slice(16, buffer.length), key, buffer.slice(0, 16))