Merge branch 'master' into pr/2472

This commit is contained in:
Adhiraj Singh
2023-03-02 15:22:46 +05:30
20 changed files with 1020 additions and 871 deletions

View File

@@ -4,4 +4,5 @@ coverage
*.lock
.eslintrc.json
src/WABinary/index.ts
WAProto
WAProto
WASignalGroup

View File

@@ -1,5 +1,6 @@
import { Boom } from '@hapi/boom'
import makeWASocket, { AnyMessageContent, delay, DisconnectReason, fetchLatestBaileysVersion, isJidBroadcast, makeCacheableSignalKeyStore, makeInMemoryStore, MessageRetryMap, useMultiFileAuthState } from '../src'
import NodeCache from 'node-cache'
import makeWASocket, { AnyMessageContent, delay, DisconnectReason, fetchLatestBaileysVersion, makeCacheableSignalKeyStore, makeInMemoryStore, useMultiFileAuthState } from '../src'
import MAIN_LOGGER from '../src/Utils/logger'
const logger = MAIN_LOGGER.child({ })
@@ -10,7 +11,7 @@ const doReplies = !process.argv.includes('--no-reply')
// external map to store retry counts of messages when decryption/encryption fails
// keep this out of the socket itself, so as to prevent a message decryption/encryption loop across socket restarts
const msgRetryCounterMap: MessageRetryMap = { }
const msgRetryCounterCache = new NodeCache()
// the store maintains the data of the WA connection in memory
// can be written out to a file & read from it
@@ -37,11 +38,11 @@ const startSock = async() => {
/** caching makes the store faster to send/recv messages */
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
msgRetryCounterMap,
msgRetryCounterCache,
generateHighQualityLinkPreview: true,
// ignore all broadcast messages -- to receive the same
// comment the line below out
shouldIgnoreJid: jid => isJidBroadcast(jid),
// shouldIgnoreJid: jid => isJidBroadcast(jid),
// implement to handle retries
getMessage: async key => {
if(store) {

View File

@@ -1,11 +1,11 @@
module.exports = {
"roots": [
"<rootDir>/src"
'roots': [
'<rootDir>/src'
],
"testMatch": [
"**/Tests/test.*.+(ts|tsx|js)",
'testMatch': [
'**/Tests/test.*.+(ts|tsx|js)',
],
"transform": {
"^.+\\.(ts|tsx)$": "ts-jest"
'transform': {
'^.+\\.(ts|tsx)$': 'ts-jest'
},
}

View File

@@ -24,8 +24,8 @@
"build:tsc": "tsc",
"example": "node --inspect -r ts-node/register Example/example.ts",
"gen:protobuf": "sh WAProto/GenerateStatics.sh",
"lint": "eslint ./src --ext .js,.ts,.jsx,.tsx",
"lint:fix": "eslint ./src --fix --ext .js,.ts,.jsx,.tsx"
"lint": "eslint . --ext .js,.ts,.jsx,.tsx",
"lint:fix": "eslint . --fix --ext .js,.ts,.jsx,.tsx"
},
"author": "Adhiraj Singh",
"license": "MIT",
@@ -34,7 +34,7 @@
},
"dependencies": {
"@hapi/boom": "^9.1.3",
"axios": "^0.24.0",
"axios": "^1.3.3",
"futoin-hkdf": "^1.5.1",
"libsignal": "git+https://github.com/adiwajshing/libsignal-node",
"music-metadata": "^7.12.3",

View File

@@ -1,3 +1,3 @@
{
"version": [2, 2243, 7]
"version": [2, 2308, 7]
}

View File

@@ -99,4 +99,11 @@ export const MEDIA_KEYS = Object.keys(MEDIA_PATH_MAP) as MediaType[]
export const MIN_PREKEY_COUNT = 5
export const INITIAL_PREKEY_COUNT = 30
export const INITIAL_PREKEY_COUNT = 30
export const DEFAULT_CACHE_TTLS = {
SIGNAL_STORE: 5 * 60, // 5 minutes
MSG_RETRY: 60 * 60, // 1 hour
CALL_OFFER: 5 * 60, // 5 minutes
USER_DEVICES: 5 * 60, // 5 minutes
}

View File

@@ -339,7 +339,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
name,
version: state.version.toString(),
// return snapshot if being synced from scratch
return_snapshot: (!state.version).toString()
'return_snapshot': (!state.version).toString()
}
})
}

View File

@@ -1,8 +1,9 @@
import NodeCache from 'node-cache'
import { proto } from '../../WAProto'
import { KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
import { DEFAULT_CACHE_TTLS, KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults'
import { MessageReceiptType, MessageRelayOptions, MessageUserReceipt, SocketConfig, WACallEvent, WAMessageKey, WAMessageStubType, WAPatchName } from '../Types'
import { decodeMediaRetryNode, decodeMessageStanza, delay, encodeBigEndian, encodeSignedDeviceIdentity, getCallStatusFromNode, getHistoryMsg, getNextPreKeys, getStatusFromReceiptType, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { decodeMediaRetryNode, decryptMessageNode, delay, encodeBigEndian, encodeSignedDeviceIdentity, getCallStatusFromNode, getHistoryMsg, getNextPreKeys, getStatusFromReceiptType, unixTimestampSeconds, xmppPreKey, xmppSignedPreKey } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
import { cleanMessage } from '../Utils/process-message'
import { areJidsSameUser, BinaryNode, getAllBinaryNodeChildren, getBinaryNodeChild, getBinaryNodeChildren, isJidGroup, isJidUser, jidDecode, jidNormalizedUser, S_WHATSAPP_NET } from '../WABinary'
@@ -36,8 +37,14 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
/** this mutex ensures that each retryRequest will wait for the previous one to finish */
const retryMutex = makeMutex()
const msgRetryMap = config.msgRetryCounterMap || { }
const callOfferData: { [id: string]: WACallEvent } = { }
const msgRetryCache = config.msgRetryCounterCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})
const callOfferCache = config.callOfferCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.CALL_OFFER, // 5 mins
useClones: false
})
let sendActiveReceipts = false
@@ -90,15 +97,15 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const sendRetryRequest = async(node: BinaryNode, forceIncludeKeys = false) => {
const msgId = node.attrs.id
let retryCount = msgRetryMap[msgId] || 0
let retryCount = msgRetryCache.get<number>(msgId) || 0
if(retryCount >= 5) {
logger.debug({ retryCount, msgId }, 'reached retry limit, clearing')
delete msgRetryMap[msgId]
msgRetryCache.del(msgId)
return
}
retryCount += 1
msgRetryMap[msgId] = retryCount
msgRetryCache.set(msgId, retryCount)
const { account, signedPreKey, signedIdentityKey: identityKey } = authState.creds
@@ -362,13 +369,14 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const willSendMessageAgain = (id: string, participant: string) => {
const key = `${id}:${participant}`
const retryCount = msgRetryMap[key] || 0
const retryCount = msgRetryCache.get<number>(key) || 0
return retryCount < 5
}
const updateSendMessageAgainCount = (id: string, participant: string) => {
const key = `${id}:${participant}`
msgRetryMap[key] = (msgRetryMap[key] || 0) + 1
const newValue = (msgRetryCache.get<number>(key) || 0) + 1
msgRetryCache.set(key, newValue)
}
const sendMessagesAgain = async(
@@ -535,7 +543,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
const handleMessage = async(node: BinaryNode) => {
const { fullMessage: msg, category, author, decrypt } = decodeMessageStanza(node, authState)
const { fullMessage: msg, category, author, decrypt } = decryptMessageNode(node, authState)
if(shouldIgnoreJid(msg.key.remoteJid!)) {
logger.debug({ key: msg.key }, 'ignored message')
await sendMessageAck(node)
@@ -618,18 +626,20 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
if(status === 'offer') {
call.isVideo = !!getBinaryNodeChild(infoChild, 'video')
call.isGroup = infoChild.attrs.type === 'group'
callOfferData[call.id] = call
callOfferCache.set(call.id, call)
}
const existingCall = callOfferCache.get<WACallEvent>(call.id)
// use existing call info to populate this event
if(callOfferData[call.id]) {
call.isVideo = callOfferData[call.id].isVideo
call.isGroup = callOfferData[call.id].isGroup
if(existingCall) {
call.isVideo = existingCall.isVideo
call.isGroup = existingCall.isGroup
}
// delete data once call has ended
if(status === 'reject' || status === 'accept' || status === 'timeout') {
delete callOfferData[call.id]
callOfferCache.del(call.id)
}
ev.emit('call', [call])

View File

@@ -2,7 +2,7 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import { proto } from '../../WAProto'
import { WA_DEFAULT_EPHEMERAL } from '../Defaults'
import { DEFAULT_CACHE_TTLS, WA_DEFAULT_EPHEMERAL } from '../Defaults'
import { AnyMessageContent, MediaConnInfo, MessageReceiptType, MessageRelayOptions, MiscMessageGenerationOptions, SocketConfig, WAMessageKey } from '../Types'
import { aggregateMessageKeysNotFromMe, assertMediaContent, bindWaitForEvent, decryptMediaRetryData, encodeSignedDeviceIdentity, encodeWAMessage, encryptMediaRetryRequest, encryptSenderKeyMsgSignalProto, encryptSignalProto, extractDeviceJids, generateMessageID, generateWAMessage, getStatusCodeForMediaRetry, getUrlFromDirectPath, getWAUploadToServer, jidToSignalProtocolAddress, parseAndInjectE2ESessions, unixTimestampSeconds } from '../Utils'
import { getUrlInfo } from '../Utils/link-preview'
@@ -32,7 +32,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
} = sock
const userDevicesCache = config.userDevicesCache || new NodeCache({
stdTTL: 300, // 5 minutes
stdTTL: DEFAULT_CACHE_TTLS.USER_DEVICES, // 5 minutes
useClones: false
})
@@ -53,7 +53,10 @@ export const makeMessagesSocket = (config: SocketConfig) => {
const mediaConnNode = getBinaryNodeChild(result, 'media_conn')
const node: MediaConnInfo = {
hosts: getBinaryNodeChildren(mediaConnNode, 'host').map(
item => item.attrs as any
({ attrs }) => ({
hostname: attrs.hostname,
maxContentLengthBytes: +attrs.maxContentLengthBytes,
})
),
auth: mediaConnNode!.attrs.auth,
ttl: +mediaConnNode!.attrs.ttl,
@@ -144,8 +147,9 @@ export const makeMessagesSocket = (config: SocketConfig) => {
for(let jid of jids) {
const user = jidDecode(jid)?.user
jid = jidNormalizedUser(jid)
if(userDevicesCache.has(user!) && useCache) {
const devices = userDevicesCache.get<JidWithDevice[]>(user!)!
const devices = userDevicesCache.get<JidWithDevice[]>(user!)
if(devices && useCache) {
deviceResults.push(...devices)
logger.trace({ user }, 'using cache for devices')
@@ -319,7 +323,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
// only send to the specific device that asked for a retry
// otherwise the message is sent out to every device that should be a recipient
if(!isGroup) {
additionalAttributes = { ...additionalAttributes, device_fanout: 'false' }
additionalAttributes = { ...additionalAttributes, 'device_fanout': 'false' }
}
const { user, device } = jidDecode(participant.jid)!
@@ -636,6 +640,7 @@ export const makeMessagesSocket = (config: SocketConfig) => {
),
upload: waUploadToServer,
mediaCache: config.mediaCache,
options: config.options,
...options,
}
)

View File

@@ -32,12 +32,13 @@ export const makeSocket = ({
}: SocketConfig) => {
const ws = new WebSocket(waWebSocketUrl, undefined, {
origin: DEFAULT_ORIGIN,
headers: options.headers,
headers: options.headers as {},
handshakeTimeout: connectTimeoutMs,
timeout: connectTimeoutMs,
agent
})
ws.setMaxListeners(0)
const ev = makeEventBuffer(logger)
/** ephemeral key pair used to encrypt/decrypt communication. Unique for each connection */
const ephemeralKeyPair = Curve.generateKeyPair()
@@ -65,7 +66,17 @@ export const makeSocket = ({
}
const bytes = noise.encodeFrame(data)
await sendPromise.call(ws, bytes) as Promise<void>
await promiseTimeout<void>(
connectTimeoutMs,
async(resolve, reject) => {
try {
await sendPromise.call(ws, bytes)
resolve()
} catch(error) {
reject(error)
}
}
)
}
/** send a binary node */
@@ -97,7 +108,7 @@ export const makeSocket = ({
const result = promiseTimeout<any>(connectTimeoutMs, (resolve, reject) => {
onOpen = (data: any) => resolve(data)
onClose = reject
onClose = mapWebSocketError(reject)
ws.on('frame', onOpen)
ws.on('close', onClose)
ws.on('error', onClose)
@@ -335,7 +346,7 @@ export const makeSocket = ({
let onClose: (err: Error) => void
await new Promise((resolve, reject) => {
onOpen = () => resolve(undefined)
onClose = reject
onClose = mapWebSocketError(reject)
ws.on('open', onOpen)
ws.on('close', onClose)
ws.on('error', onClose)
@@ -433,12 +444,7 @@ export const makeSocket = ({
end(err)
}
})
ws.on('error', error => end(
new Boom(
`WebSocket Error (${error.message})`,
{ statusCode: getCodeFromWSError(error), data: error }
)
))
ws.on('error', mapWebSocketError(end))
ws.on('close', () => end(new Boom('Connection Terminated', { statusCode: DisconnectReason.connectionClosed })))
// the server terminated the connection
ws.on('CB:xmlstreamend', () => end(new Boom('Connection Terminated by Server', { statusCode: DisconnectReason.connectionClosed })))
@@ -604,4 +610,19 @@ export const makeSocket = ({
}
}
/**
* map the websocket error to the right type
* so it can be retried by the caller
* */
function mapWebSocketError(handler: (err: Error) => void) {
return (error: Error) => {
handler(
new Boom(
`WebSocket Error (${error.message})`,
{ statusCode: getCodeFromWSError(error), data: error }
)
)
}
}
export type Socket = ReturnType<typeof makeSocket>

View File

@@ -1,3 +1,4 @@
import { AxiosRequestConfig } from 'axios'
import type NodeCache from 'node-cache'
import type { Logger } from 'pino'
import type { Readable } from 'stream'
@@ -5,6 +6,7 @@ import type { URL } from 'url'
import { proto } from '../../WAProto'
import { MEDIA_HKDF_KEY_MAPPING } from '../Defaults'
import type { GroupMetadata } from './GroupMetadata'
import { CacheStore } from './Socket'
// export the WAMessage Prototypes
export { proto as WAProto }
@@ -210,9 +212,11 @@ export type MediaGenerationOptions = {
mediaTypeOverride?: MediaType
upload: WAMediaUploadFunction
/** cache media so it does not have to be uploaded again */
mediaCache?: NodeCache
mediaCache?: CacheStore
mediaUploadTimeoutMs?: number
options?: AxiosRequestConfig
}
export type MessageContentGenerationOptions = MediaGenerationOptions & {
getUrlInfo?: (text: string) => Promise<WAUrlInfo | undefined>

View File

@@ -1,7 +1,6 @@
import { AxiosRequestConfig } from 'axios'
import type { Agent } from 'https'
import type NodeCache from 'node-cache'
import type { Logger } from 'pino'
import type { URL } from 'url'
import { proto } from '../../WAProto'
@@ -11,33 +10,40 @@ import { MediaConnInfo } from './Message'
export type WAVersion = [number, number, number]
export type WABrowserDescription = [string, string, string]
export type MessageRetryMap = { [msgId: string]: number }
export type CacheStore = {
/** get a cached key and change the stats */
get<T>(key: string): T | undefined
/** set a key in the cache */
set<T>(key: string, value: T): void
/** delete a key from the cache */
del(key: string): void
/** flush all data */
flushAll(): void
}
export type SocketConfig = {
/** the WS url to connect to WA */
waWebSocketUrl: string | URL
/** Fails the connection if the socket times out in this interval */
connectTimeoutMs: number
connectTimeoutMs: number
/** Default timeout for queries, undefined for no timeout */
defaultQueryTimeoutMs: number | undefined
/** ping-pong interval for WS connection */
keepAliveIntervalMs: number
/** proxy agent */
agent?: Agent
agent?: Agent
/** pino logger */
logger: Logger
logger: Logger
/** version to connect with */
version: WAVersion
/** override browser config */
browser: WABrowserDescription
/** agent used for fetch requests -- uploading/downloading media */
fetchAgent?: Agent
browser: WABrowserDescription
/** agent used for fetch requests -- uploading/downloading media */
fetchAgent?: Agent
/** should the QR be printed in the terminal */
printQRInTerminal: boolean
/** should events be emitted for actions done by this socket connection */
emitOwnEvents: boolean
/** provide a cache to store media, so does not have to be re-uploaded */
mediaCache?: NodeCache
/** custom upload hosts to upload media to */
customUploadHosts: MediaConnInfo['hosts']
/** time to wait between sending new retry requests */
@@ -50,14 +56,19 @@ export type SocketConfig = {
shouldSyncHistoryMessage: (msg: proto.Message.IHistorySyncNotification) => boolean
/** transaction capability options for SignalKeyStore */
transactionOpts: TransactionCapabilityOptions
/** provide a cache to store a user's device list */
userDevicesCache?: NodeCache
/** marks the client as online whenever the socket successfully connects */
markOnlineOnConnect: boolean
/** provide a cache to store media, so does not have to be re-uploaded */
mediaCache?: CacheStore
/**
* map to store the retry counts for failed messages;
* used to determine whether to retry a message or not */
msgRetryCounterMap?: MessageRetryMap
msgRetryCounterCache?: CacheStore
/** provide a cache to store a user's device list */
userDevicesCache?: CacheStore
/** cache to store call offers */
callOfferCache?: CacheStore
/** width for link preview images */
linkPreviewImageThumbnailWidth: number
/** Should Baileys ask the phone for full history, will be received async */
@@ -92,7 +103,7 @@ export type SocketConfig = {
}
/** options for axios */
options: AxiosRequestConfig<any>
options: AxiosRequestConfig<{}>
/**
* fetch a message from your store
* implement this so that messages failed to send (solves the "this message can take a while" issue) can be retried

View File

@@ -1,7 +1,8 @@
import { randomBytes } from 'crypto'
import NodeCache from 'node-cache'
import type { Logger } from 'pino'
import type { AuthenticationCreds, SignalDataSet, SignalDataTypeMap, SignalKeyStore, SignalKeyStoreWithTransaction, TransactionCapabilityOptions } from '../Types'
import { DEFAULT_CACHE_TTLS } from '../Defaults'
import type { AuthenticationCreds, CacheStore, SignalDataSet, SignalDataTypeMap, SignalKeyStore, SignalKeyStoreWithTransaction, TransactionCapabilityOptions } from '../Types'
import { Curve, signedKeyPair } from './crypto'
import { delay, generateRegistrationId } from './generics'
@@ -9,16 +10,17 @@ import { delay, generateRegistrationId } from './generics'
* Adds caching capability to a SignalKeyStore
* @param store the store to add caching to
* @param logger to log trace events
* @param opts NodeCache options
* @param _cache cache store to use
*/
export function makeCacheableSignalKeyStore(
store: SignalKeyStore,
logger: Logger,
opts?: NodeCache.Options
_cache?: CacheStore
): SignalKeyStore {
const cache = new NodeCache({
...opts || { },
const cache = _cache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.SIGNAL_STORE, // 5 minutes
useClones: false,
deleteOnExpire: true,
})
function getUniqueId(type: string, id: string) {

View File

@@ -9,7 +9,11 @@ const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node'
type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status'
export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationState) => {
/**
* Decode the received node as a message.
* @note this will only parse the message, not decrypt it
*/
export function decodeMessageNode(stanza: BinaryNode, meId: string) {
let msgType: MessageType
let chatId: string
let author: string
@@ -19,7 +23,7 @@ export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationStat
const participant: string | undefined = stanza.attrs.participant
const recipient: string | undefined = stanza.attrs.recipient
const isMe = (jid: string) => areJidsSameUser(jid, auth.creds.me!.id)
const isMe = (jid: string) => areJidsSameUser(jid, meId)
if(isJidUser(from)) {
if(recipient) {
@@ -60,8 +64,6 @@ export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationStat
throw new Boom('Unknown message type', { data: stanza })
}
const sender = msgType === 'chat' ? author : chatId
const fromMe = isMe(stanza.attrs.participant || stanza.attrs.from)
const pushname = stanza.attrs.notify
@@ -75,13 +77,23 @@ export const decodeMessageStanza = (stanza: BinaryNode, auth: AuthenticationStat
const fullMessage: proto.IWebMessageInfo = {
key,
messageTimestamp: +stanza.attrs.t,
pushName: pushname
pushName: pushname,
broadcast: isJidBroadcast(from)
}
if(key.fromMe) {
fullMessage.status = proto.WebMessageInfo.Status.SERVER_ACK
}
return {
fullMessage,
author,
sender: msgType === 'chat' ? author : chatId
}
}
export const decryptMessageNode = (stanza: BinaryNode, auth: AuthenticationState) => {
const { fullMessage, author, sender } = decodeMessageNode(stanza, auth.creds.me!.id)
return {
fullMessage,
category: stanza.attrs.category,

View File

@@ -138,13 +138,13 @@ export const delayCancellable = (ms: number) => {
export async function promiseTimeout<T>(ms: number | undefined, promise: (resolve: (v?: T) => void, reject: (error) => void) => void) {
if(!ms) {
return new Promise (promise)
return new Promise(promise)
}
const stack = new Error().stack
// Create a promise that rejects in <ms> milliseconds
const { delay, cancel } = delayCancellable (ms)
const p = new Promise ((resolve, reject) => {
const p = new Promise((resolve, reject) => {
delay
.then(() => reject(
new Boom('Timed Out', {
@@ -353,7 +353,10 @@ export const getCodeFromWSError = (error: Error) => {
if(!Number.isNaN(code) && code >= 400) {
statusCode = code
}
} else if((error as any).code?.startsWith('E')) { // handle ETIMEOUT, ENOTFOUND etc
} else if(
(error as any).code?.startsWith('E')
|| error?.message?.includes('timed out')
) { // handle ETIMEOUT, ENOTFOUND etc
statusCode = 408
}

View File

@@ -1,3 +1,4 @@
import { AxiosRequestConfig } from 'axios'
import { Logger } from 'pino'
import { WAMediaUploadFunction, WAUrlInfo } from '../Types'
import { prepareWAMessageMedia } from './messages'
@@ -21,7 +22,7 @@ export type URLGenerationOptions = {
/** Timeout in ms */
timeout: number
proxyUrl?: string
headers?: { [key: string]: string }
headers?: AxiosRequestConfig<{}>['headers']
}
uploadImage?: WAMediaUploadFunction
logger?: Logger
@@ -47,7 +48,10 @@ export const getUrlInfo = async(
previewLink = 'https://' + previewLink
}
const info = await getLinkPreview(previewLink, opts.fetchOpts)
const info = await getLinkPreview(previewLink, {
...opts.fetchOpts,
headers: opts.fetchOpts as {}
})
if(info && 'title' in info && info.title) {
const [image] = info.images
@@ -62,7 +66,11 @@ export const getUrlInfo = async(
if(opts.uploadImage) {
const { imageMessage } = await prepareWAMessageMedia(
{ image: { url: image } },
{ upload: opts.uploadImage, mediaTypeOverride: 'thumbnail-link' }
{
upload: opts.uploadImage,
mediaTypeOverride: 'thumbnail-link',
options: opts.fetchOpts
}
)
urlInfo.jpegThumbnail = imageMessage?.jpegThumbnail
? Buffer.from(imageMessage.jpegThumbnail)

View File

@@ -192,8 +192,11 @@ export async function getAudioDuration(buffer: Buffer | string | Readable) {
metadata = await musicMetadata.parseBuffer(buffer, undefined, { duration: true })
} else if(typeof buffer === 'string') {
const rStream = createReadStream(buffer)
metadata = await musicMetadata.parseStream(rStream, undefined, { duration: true })
rStream.close()
try {
metadata = await musicMetadata.parseStream(rStream, undefined, { duration: true })
} finally {
rStream.destroy()
}
} else {
metadata = await musicMetadata.parseStream(buffer, undefined, { duration: true })
}
@@ -209,29 +212,29 @@ export const toReadable = (buffer: Buffer) => {
}
export const toBuffer = async(stream: Readable) => {
let buff = Buffer.alloc(0)
const chunks: Buffer[] = []
for await (const chunk of stream) {
buff = Buffer.concat([ buff, chunk ])
chunks.push(chunk)
}
stream.destroy()
return buff
return Buffer.concat(chunks)
}
export const getStream = async(item: WAMediaUpload) => {
export const getStream = async(item: WAMediaUpload, opts?: AxiosRequestConfig) => {
if(Buffer.isBuffer(item)) {
return { stream: toReadable(item), type: 'buffer' }
return { stream: toReadable(item), type: 'buffer' } as const
}
if('stream' in item) {
return { stream: item.stream, type: 'readable' }
return { stream: item.stream, type: 'readable' } as const
}
if(item.url.toString().startsWith('http://') || item.url.toString().startsWith('https://')) {
return { stream: await getHttpStream(item.url), type: 'remote' }
return { stream: await getHttpStream(item.url, opts), type: 'remote' } as const
}
return { stream: createReadStream(item.url), type: 'file' }
return { stream: createReadStream(item.url), type: 'file' } as const
}
/** generates a thumbnail for a given media, if required */
@@ -278,21 +281,23 @@ export const getHttpStream = async(url: string | URL, options: AxiosRequestConfi
return fetched.data as Readable
}
type EncryptedStreamOptions = {
saveOriginalFileIfRequired?: boolean
logger?: Logger
opts?: AxiosRequestConfig
}
export const encryptedStream = async(
media: WAMediaUpload,
mediaType: MediaType,
saveOriginalFileIfRequired = true,
logger?: Logger
{ logger, saveOriginalFileIfRequired, opts }: EncryptedStreamOptions = {}
) => {
const { stream, type } = await getStream(media)
const { stream, type } = await getStream(media, opts)
logger?.debug('fetched media stream')
const mediaKey = Crypto.randomBytes(32)
const { cipherKey, iv, macKey } = getMediaKeys(mediaKey, mediaType)
// random name
//const encBodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageID() + '.enc')
// const encWriteStream = createWriteStream(encBodyPath)
const encWriteStream = new Readable({ read: () => {} })
let bodyPath: string | undefined
@@ -312,15 +317,23 @@ export const encryptedStream = async(
let sha256Plain = Crypto.createHash('sha256')
let sha256Enc = Crypto.createHash('sha256')
const onChunk = (buff: Buffer) => {
sha256Enc = sha256Enc.update(buff)
hmac = hmac.update(buff)
encWriteStream.push(buff)
}
try {
for await (const data of stream) {
fileLength += data.length
if(
type === 'remote'
&& opts?.maxContentLength
&& fileLength + data.length > opts.maxContentLength
) {
throw new Boom(
`content length exceeded when encrypting "${type}"`,
{
data: { media, type }
}
)
}
sha256Plain = sha256Plain.update(data)
if(writeStream) {
if(!writeStream.write(data)) {
@@ -342,7 +355,7 @@ export const encryptedStream = async(
encWriteStream.push(mac)
encWriteStream.push(null)
writeStream && writeStream.end()
writeStream?.end()
stream.destroy()
logger?.debug('encrypted data successfully')
@@ -366,8 +379,22 @@ export const encryptedStream = async(
sha256Enc.destroy(error)
stream.destroy(error)
if(didSaveToTmpPath) {
try {
await fs.unlink(bodyPath!)
} catch(err) {
logger?.error({ err }, 'failed to save to tmp path')
}
}
throw error
}
function onChunk(buff: Buffer) {
sha256Enc = sha256Enc.update(buff)
hmac = hmac.update(buff)
encWriteStream.push(buff)
}
}
const DEF_HOST = 'mmg.whatsapp.net'
@@ -421,14 +448,14 @@ export const downloadEncryptedContent = async(
const endChunk = endByte ? toSmallestChunkSize(endByte || 0) + AES_CHUNK_SIZE : undefined
const headers: { [_: string]: string } = {
const headers: AxiosRequestConfig['headers'] = {
...options?.headers || { },
Origin: DEFAULT_ORIGIN,
}
if(startChunk || endChunk) {
headers.Range = `bytes=${startChunk}-`
headers!.Range = `bytes=${startChunk}-`
if(endChunk) {
headers.Range += endChunk
headers!.Range += endChunk
}
}
@@ -644,7 +671,7 @@ export const encryptMediaRetryRequest = (
tag: 'rmr',
attrs: {
jid: key.remoteJid!,
from_me: (!!key.fromMe).toString(),
'from_me': (!!key.fromMe).toString(),
// @ts-ignore
participant: key.participant || undefined
}

View File

@@ -151,7 +151,11 @@ export const prepareWAMessageMedia = async(
} = await encryptedStream(
uploadData.media,
options.mediaTypeOverride || mediaType,
requiresOriginalForSomeProcessing
{
logger,
saveOriginalFileIfRequired: requiresOriginalForSomeProcessing,
opts: options.options
}
)
// url safe Base64 encode the SHA256 hash of the body
const fileEncSha256B64 = fileEncSha256.toString('base64')

View File

@@ -3,7 +3,7 @@ import type { Logger } from 'pino'
import { proto } from '../../WAProto'
import { AuthenticationCreds, BaileysEventEmitter, Chat, GroupMetadata, ParticipantAction, SignalKeyStoreWithTransaction, WAMessageStubType } from '../Types'
import { downloadAndProcessHistorySyncNotification, getContentType, normalizeMessageContent, toNumber } from '../Utils'
import { areJidsSameUser, jidNormalizedUser } from '../WABinary'
import { areJidsSameUser, isJidBroadcast, isJidStatusBroadcast, jidNormalizedUser } from '../WABinary'
type ProcessMessageContext = {
shouldProcessHistoryMsg: boolean
@@ -72,6 +72,22 @@ export const shouldIncrementChatUnread = (message: proto.IWebMessageInfo) => (
!message.key.fromMe && !message.messageStubType
)
/**
* Get the ID of the chat from the given key.
* Typically -- that'll be the remoteJid, but for broadcasts, it'll be the participant
*/
export const getChatId = ({ remoteJid, participant, fromMe }: proto.IMessageKey) => {
if(
isJidBroadcast(remoteJid!)
&& !isJidStatusBroadcast(remoteJid!)
&& !fromMe
) {
return participant!
}
return remoteJid!
}
const processMessage = async(
message: proto.IWebMessageInfo,
{
@@ -86,7 +102,7 @@ const processMessage = async(
const meId = creds.me!.id
const { accountSettings } = creds
const chat: Partial<Chat> = { id: jidNormalizedUser(message.key.remoteJid!) }
const chat: Partial<Chat> = { id: jidNormalizedUser(getChatId(message.key)) }
const isRealMsg = isRealMessage(message, meId)
if(isRealMsg) {

1541
yarn.lock

File diff suppressed because it is too large Load Diff