mirror of
https://github.com/FranP-code/Baileys.git
synced 2025-10-13 00:32:22 +00:00
Added function to change read status for chat, updated connect
This commit is contained in:
@@ -309,23 +309,27 @@ export class WAConnection extends EventEmitter {
|
||||
|
||||
this.state = 'close'
|
||||
this.msgCount = 0
|
||||
this.conn?.removeAllListeners ('close')
|
||||
this.conn?.close()
|
||||
this.conn = null
|
||||
this.phoneConnected = false
|
||||
this.lastDisconnectReason = reason
|
||||
this.lastSeen = null
|
||||
|
||||
|
||||
this.endConnection ()
|
||||
|
||||
if (reason === 'invalid_session' || reason === 'intentional') {
|
||||
this.pendingRequests.forEach (({reject}) => reject(new Error('close')))
|
||||
this.pendingRequests = []
|
||||
}
|
||||
|
||||
this.removePendingCallbacks ()
|
||||
|
||||
// reconnecting if the timeout is active for the reconnect loop
|
||||
this.emit ('close', { reason, isReconnecting })
|
||||
}
|
||||
protected removePendingCallbacks () {
|
||||
protected endConnection () {
|
||||
this.conn?.removeAllListeners ('close')
|
||||
this.conn?.close()
|
||||
this.conn = null
|
||||
this.lastSeen = null
|
||||
|
||||
Object.keys(this.callbacks).forEach(key => {
|
||||
if (!key.includes('function:')) {
|
||||
this.log (`cancelling message wait: ${key}`, MessageLogLevel.info)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import * as Utils from './Utils'
|
||||
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError } from './Constants'
|
||||
import { WAMessage, WAChat, MessageLogLevel, WANode, KEEP_ALIVE_INTERVAL_MS, BaileysError, WAConnectOptions, DisconnectReason, UNAUTHORIZED_CODES, WAContact, TimedOutError, CancelledError } from './Constants'
|
||||
import {WAConnection as Base} from './1.Validation'
|
||||
import Decoder from '../Binary/Decoder'
|
||||
|
||||
@@ -53,18 +53,28 @@ export class WAConnection extends Base {
|
||||
protected async connectInternal (options: WAConnectOptions, delayMs?: number) {
|
||||
// actual connect
|
||||
const connect = () => {
|
||||
const tasks: Promise<void>[] = []
|
||||
const timeoutMs = options?.timeoutMs || 60*1000
|
||||
|
||||
const { ws, cancel } = Utils.openWebSocketConnection (5000, false)
|
||||
|
||||
let task = ws
|
||||
.then (conn => this.conn = conn)
|
||||
.then (() => this.conn.on('message', data => this.onMessageRecieved(data as any)))
|
||||
.then (() => (
|
||||
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
|
||||
))
|
||||
.then (() => this.authenticate(reconnectID))
|
||||
.then (() => {
|
||||
this.conn
|
||||
.removeAllListeners ('error')
|
||||
.removeAllListeners('close')
|
||||
})
|
||||
|
||||
let cancelTask: () => void
|
||||
if (typeof options?.waitForChats === 'undefined' ? true : options?.waitForChats) {
|
||||
const {waitForChats, cancelChats} = this.receiveChatsAndContacts(true)
|
||||
|
||||
const {waitForChats, cancelChats} = this.receiveChatsAndContacts(timeoutMs, true)
|
||||
tasks.push (waitForChats)
|
||||
|
||||
cancellations.push (cancelChats)
|
||||
task = Promise.all ([task, waitForChats]).then (() => {})
|
||||
cancelTask = () => { cancelChats(); cancel() }
|
||||
} else cancelTask = cancel
|
||||
|
||||
@@ -74,33 +84,15 @@ export class WAConnection extends Base {
|
||||
this.lastDisconnectReason !== DisconnectReason.intentional && this.user?.jid
|
||||
const reconnectID = shouldUseReconnect ? this.user.jid.replace ('@s.whatsapp.net', '@c.us') : null
|
||||
|
||||
const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => {
|
||||
ws
|
||||
.then (conn => this.conn = conn)
|
||||
.then (() => this.conn.on('message', data => this.onMessageRecieved(data as any)))
|
||||
.then (() => (
|
||||
this.log(`connected to WhatsApp Web server, authenticating via ${reconnectID ? 'reconnect' : 'takeover'}`, MessageLogLevel.info)
|
||||
))
|
||||
.then (() => this.authenticate(reconnectID))
|
||||
.then (() => (
|
||||
this.conn
|
||||
.removeAllListeners ('error')
|
||||
.removeAllListeners('close')
|
||||
))
|
||||
.then (resolve)
|
||||
.catch (reject)
|
||||
})
|
||||
const promise = Utils.promiseTimeout(timeoutMs, (resolve, reject) => (
|
||||
task.then (resolve).catch (reject)
|
||||
))
|
||||
.catch (err => {
|
||||
this.removePendingCallbacks ()
|
||||
this.endConnection ()
|
||||
throw err
|
||||
}) as Promise<void>
|
||||
|
||||
tasks.push (promise)
|
||||
|
||||
return {
|
||||
promise: Promise.all (tasks),
|
||||
cancel: cancelTask
|
||||
}
|
||||
return { promise, cancel: cancelTask }
|
||||
}
|
||||
|
||||
let promise = Promise.resolve ()
|
||||
@@ -120,7 +112,6 @@ export class WAConnection extends Base {
|
||||
.then (() => {
|
||||
const {promise, cancel} = connect ()
|
||||
cancellations.push (cancel)
|
||||
|
||||
return promise
|
||||
})
|
||||
.finally (() => {
|
||||
@@ -133,7 +124,7 @@ export class WAConnection extends Base {
|
||||
* Must be called immediately after connect
|
||||
* @returns [chats, contacts]
|
||||
*/
|
||||
protected receiveChatsAndContacts(timeoutMs: number = null, stopAfterMostRecentMessage: boolean=false) {
|
||||
protected receiveChatsAndContacts(stopAfterMostRecentMessage: boolean=false) {
|
||||
this.contacts = {}
|
||||
this.chats.clear ()
|
||||
|
||||
@@ -222,23 +213,22 @@ export class WAConnection extends Base {
|
||||
})
|
||||
|
||||
// wait for the chats & contacts to load
|
||||
const {delay, cancel} = Utils.delayCancellable (timeoutMs)
|
||||
|
||||
const waitForChats = Promise.race ([
|
||||
new Promise (resolve => resolveTask = resolve),
|
||||
delay.then (() => { throw TimedOutError() })
|
||||
])
|
||||
.then (() => (
|
||||
this.chats
|
||||
.all ()
|
||||
.forEach (chat => {
|
||||
const respectiveContact = this.contacts[chat.jid]
|
||||
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
|
||||
let cancelChats: () => void
|
||||
const waitForChats = new Promise ((resolve, reject) => {
|
||||
resolveTask = resolve
|
||||
cancelChats = () => reject (CancelledError())
|
||||
})
|
||||
))
|
||||
.finally (deregisterCallbacks)
|
||||
.then (() => (
|
||||
this.chats
|
||||
.all ()
|
||||
.forEach (chat => {
|
||||
const respectiveContact = this.contacts[chat.jid]
|
||||
chat.name = respectiveContact?.name || respectiveContact?.notify || chat.name
|
||||
})
|
||||
))
|
||||
.finally (deregisterCallbacks)
|
||||
|
||||
return { waitForChats, cancelChats: cancel }
|
||||
return { waitForChats, cancelChats }
|
||||
}
|
||||
private releasePendingRequests () {
|
||||
this.pendingRequests.forEach (({resolve}) => resolve()) // send off all pending request
|
||||
|
||||
@@ -41,16 +41,49 @@ export class WAConnection extends Base {
|
||||
return info
|
||||
}
|
||||
/**
|
||||
* Read/unread messages of a chat; will mark the entire chat read by default
|
||||
* Marks a chat as read/unread; updates the chat object too
|
||||
* @param jid the ID of the person/group whose message you want to mark read
|
||||
* @param unread unreads the chat, if true
|
||||
*/
|
||||
async chatRead (jid: string, type: 'unread' | 'read' = 'read') {
|
||||
jid = whatsappID (jid)
|
||||
const chat = this.assertChatGet (jid)
|
||||
|
||||
if (type === 'unread') await this.sendReadReceipt (jid, null, -2)
|
||||
else if (chat.count !== 0) {
|
||||
let messageID: string
|
||||
|
||||
let messages: WAMessage[]
|
||||
let cursor: any
|
||||
|
||||
messages = chat.messages
|
||||
cursor = messages[messages.length-1]?.key
|
||||
|
||||
do {
|
||||
const m = messages.reverse().find (m => !m.key.fromMe)
|
||||
if (m) messageID = m.key.id
|
||||
|
||||
const obj = await this.loadMessages (jid, 10, cursor)
|
||||
messages = obj.messages
|
||||
cursor = obj.cursor
|
||||
|
||||
if (messages.length === 0) throw new BaileysError ('no valid message found to read', { status: 404 })
|
||||
} while (!messageID)
|
||||
|
||||
await this.sendReadReceipt (jid, messageID, Math.abs(chat.count))
|
||||
}
|
||||
|
||||
chat.count = type === 'unread' ? -1 : 0
|
||||
this.emit ('chat-update', {jid, count: chat.count})
|
||||
}
|
||||
/**
|
||||
* Sends a read receipt for a given message;
|
||||
* does not update the chat do @see chatRead
|
||||
* @param jid the ID of the person/group whose message you want to mark read
|
||||
* @param messageID optionally, the message ID
|
||||
* @param count number of messages to read, set to < 0 to unread a message
|
||||
*/
|
||||
async sendReadReceipt(jid: string, messageID?: string, count?: number) {
|
||||
jid = whatsappID (jid)
|
||||
const chat = this.chats.get(jid)
|
||||
count = count || Math.abs(chat?.count || 1)
|
||||
|
||||
async sendReadReceipt(jid: string, messageID: string, count: number) {
|
||||
const attributes = {
|
||||
jid: jid,
|
||||
count: count.toString(),
|
||||
@@ -58,10 +91,6 @@ export class WAConnection extends Base {
|
||||
owner: messageID ? 'false' : null
|
||||
}
|
||||
const read = await this.setQuery ([['read', attributes, null]])
|
||||
if (chat) {
|
||||
chat.count = count > 0 ? Math.max(chat.count-count, 0) : -1
|
||||
this.emit ('chat-update', {jid, count: chat.count})
|
||||
}
|
||||
return read
|
||||
}
|
||||
/**
|
||||
|
||||
@@ -88,24 +88,16 @@ export async function promiseTimeout<T>(ms: number, promise: (resolve: (v?: T)=>
|
||||
if (!ms) return new Promise (promise)
|
||||
|
||||
// Create a promise that rejects in <ms> milliseconds
|
||||
const {delay, cancel} = delayCancellable (ms)
|
||||
|
||||
let pReject: (error) => void
|
||||
let {delay, cancel} = delayCancellable (ms)
|
||||
const p = new Promise ((resolve, reject) => {
|
||||
delay
|
||||
.then(() => reject(TimedOutError()))
|
||||
.catch (err => reject(err))
|
||||
|
||||
promise (resolve, reject)
|
||||
pReject = reject
|
||||
})
|
||||
|
||||
try {
|
||||
const content = await Promise.race([
|
||||
p,
|
||||
delay.then(() => pReject(TimedOutError()))
|
||||
])
|
||||
cancel ()
|
||||
return content as T
|
||||
} finally {
|
||||
cancel ()
|
||||
}
|
||||
.finally (cancel)
|
||||
return p as Promise<T>
|
||||
}
|
||||
|
||||
export const openWebSocketConnection = (timeoutMs: number, retryOnNetworkError: boolean) => {
|
||||
|
||||
Reference in New Issue
Block a user