From bff86ed4c1fec3f42ed46e6ccbdead378f4107b6 Mon Sep 17 00:00:00 2001 From: canove Date: Mon, 5 May 2025 21:11:40 -0300 Subject: [PATCH 1/5] fix: memory blow on large files sending through URL --- src/Utils/messages-media.ts | 98 ++++++++++++++++++++----------------- src/Utils/messages.ts | 37 ++++++-------- 2 files changed, 68 insertions(+), 67 deletions(-) diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index 0816131..788ee06 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -345,24 +345,35 @@ export const encryptedStream = async( const mediaKey = Crypto.randomBytes(32) const { cipherKey, iv, macKey } = await getMediaKeys(mediaKey, mediaType) - const encWriteStream = new Readable({ read: () => {} }) + + const encFilePath = join( + getTmpFilesDirectory(), + mediaType + generateMessageIDV2() + "-enc" + ); + const encFileWriteStream = createWriteStream(encFilePath); - let bodyPath: string | undefined - let writeStream: WriteStream | undefined - let didSaveToTmpPath = false - if(type === 'file') { - bodyPath = (media as WAMediaPayloadURL).url.toString() - } else if(saveOriginalFileIfRequired) { - bodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageIDV2()) - writeStream = createWriteStream(bodyPath) - didSaveToTmpPath = true - } + let originalFileStream: WriteStream | undefined; + let originalFilePath: string | undefined; + + if (saveOriginalFileIfRequired) { + originalFilePath = join( + getTmpFilesDirectory(), + mediaType + generateMessageIDV2() + "-original" + ); + originalFileStream = createWriteStream(originalFilePath); + } let fileLength = 0 const aes = Crypto.createCipheriv('aes-256-cbc', cipherKey, iv) - let hmac = Crypto.createHmac('sha256', macKey!).update(iv) - let sha256Plain = Crypto.createHash('sha256') - let sha256Enc = Crypto.createHash('sha256') + const hmac = Crypto.createHmac('sha256', macKey!).update(iv) + const sha256Plain = Crypto.createHash('sha256') + const sha256Enc = Crypto.createHash('sha256') + + const onChunk = (buff: Buffer) => { + sha256Enc.update(buff); + hmac.update(buff); + encFileWriteStream.write(buff); + }; try { for await (const data of stream) { @@ -379,68 +390,62 @@ export const encryptedStream = async( data: { media, type } } ) - } - - sha256Plain = sha256Plain.update(data) - if(writeStream && !writeStream.write(data)) { - await once(writeStream, 'drain') - } + } + + if (originalFileStream) { + if (!originalFileStream.write(data)) { + await once(originalFileStream, "drain"); + } + } + sha256Plain.update(data) onChunk(aes.update(data)) } onChunk(aes.final()) const mac = hmac.digest().slice(0, 10) - sha256Enc = sha256Enc.update(mac) + sha256Enc.update(mac) const fileSha256 = sha256Plain.digest() const fileEncSha256 = sha256Enc.digest() - encWriteStream.push(mac) - encWriteStream.push(null) + encFileWriteStream.write(mac); - writeStream?.end() - stream.destroy() + encFileWriteStream.end(); + originalFileStream?.end?.(); + stream.destroy(); logger?.debug('encrypted data successfully') return { mediaKey, - encWriteStream, - bodyPath, + originalFilePath, + encFilePath, mac, fileEncSha256, fileSha256, - fileLength, - didSaveToTmpPath + fileLength } } catch(error) { // destroy all streams with error - encWriteStream.destroy() - writeStream?.destroy() + encFileWriteStream.destroy() + originalFileStream?.destroy?.() aes.destroy() hmac.destroy() sha256Plain.destroy() sha256Enc.destroy() stream.destroy() - if(didSaveToTmpPath) { - try { - await fs.unlink(bodyPath!) - } catch(err) { - logger?.error({ err }, 'failed to save to tmp path') - } - } - + + try { + await fs.unlink(encFilePath) + if (originalFilePath) await fs.unlink(originalFilePath) + } catch(err) { + logger?.error({ err }, 'failed deleting tmp files') + } throw error } - - function onChunk(buff: Buffer) { - sha256Enc = sha256Enc.update(buff) - hmac = hmac.update(buff) - encWriteStream.push(buff) - } } const DEF_HOST = 'mmg.whatsapp.net' @@ -620,7 +625,8 @@ export const getWAUploadToServer = ( url, stream, { - ...options, + ...options, + maxRedirects: 0, headers: { ...options.headers || { }, 'Content-Type': 'application/octet-stream', diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index b164fc8..335933d 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import axios from 'axios' import { randomBytes } from 'crypto' -import { promises as fs } from 'fs' +import { createReadStream, promises as fs } from 'fs' import { type Transform } from 'stream' import { proto } from '../../WAProto' import { MEDIA_KEYS, URL_REGEX, WA_DEFAULT_EPHEMERAL } from '../Defaults' @@ -155,15 +155,14 @@ export const prepareWAMessageMedia = async( (typeof uploadData['jpegThumbnail'] === 'undefined') const requiresWaveformProcessing = mediaType === 'audio' && uploadData.ptt === true const requiresAudioBackground = options.backgroundColor && mediaType === 'audio' && uploadData.ptt === true - const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation + const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation const { mediaKey, - encWriteStream, - bodyPath, + encFilePath, + originalFilePath, fileEncSha256, fileSha256, - fileLength, - didSaveToTmpPath + fileLength } = await encryptedStream( uploadData.media, options.mediaTypeOverride || mediaType, @@ -178,7 +177,7 @@ export const prepareWAMessageMedia = async( const [{ mediaUrl, directPath }] = await Promise.all([ (async() => { const result = await options.upload( - encWriteStream, + createReadStream(encFilePath), { fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs } ) logger?.debug({ mediaType, cacheableKey }, 'uploaded media') @@ -190,7 +189,7 @@ export const prepareWAMessageMedia = async( const { thumbnail, originalImageDimensions - } = await generateThumbnail(bodyPath!, mediaType as 'image' | 'video', options) + } = await generateThumbnail(originalFilePath!, mediaType as 'image' | 'video', options) uploadData.jpegThumbnail = thumbnail if(!uploadData.width && originalImageDimensions) { uploadData.width = originalImageDimensions.width @@ -202,12 +201,12 @@ export const prepareWAMessageMedia = async( } if(requiresDurationComputation) { - uploadData.seconds = await getAudioDuration(bodyPath!) + uploadData.seconds = await getAudioDuration(originalFilePath!) logger?.debug('computed audio duration') } if(requiresWaveformProcessing) { - uploadData.waveform = await getAudioWaveform(bodyPath!, logger) + uploadData.waveform = await getAudioWaveform(originalFilePath!, logger) logger?.debug('processed waveform') } @@ -222,17 +221,13 @@ export const prepareWAMessageMedia = async( ]) .finally( async() => { - encWriteStream.destroy() - // remove tmp files - if(didSaveToTmpPath && bodyPath) { - try { - await fs.access(bodyPath) - await fs.unlink(bodyPath) - logger?.debug('removed tmp file') - } catch(error) { - logger?.warn('failed to remove tmp file') - } - } + try { + await fs.unlink(encFilePath) + if (originalFilePath) await fs.unlink(originalFilePath) + logger?.debug('removed tmp files') + } catch(error) { + logger?.warn('failed to remove tmp file') + } } ) From 53c77dd11cda5b7d6ada53aa2031e61a558d3bde Mon Sep 17 00:00:00 2001 From: canove Date: Mon, 5 May 2025 21:20:25 -0300 Subject: [PATCH 2/5] chore: lint --- src/Utils/messages-media.ts | 93 +++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index 788ee06..ea04080 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -345,35 +345,35 @@ export const encryptedStream = async( const mediaKey = Crypto.randomBytes(32) const { cipherKey, iv, macKey } = await getMediaKeys(mediaKey, mediaType) - - const encFilePath = join( - getTmpFilesDirectory(), - mediaType + generateMessageIDV2() + "-enc" - ); - const encFileWriteStream = createWriteStream(encFilePath); - let originalFileStream: WriteStream | undefined; - let originalFilePath: string | undefined; + const encFilePath = join( + getTmpFilesDirectory(), + mediaType + generateMessageIDV2() + '-enc' + ) + const encFileWriteStream = createWriteStream(encFilePath) - if (saveOriginalFileIfRequired) { - originalFilePath = join( - getTmpFilesDirectory(), - mediaType + generateMessageIDV2() + "-original" - ); - originalFileStream = createWriteStream(originalFilePath); - } + let originalFileStream: WriteStream | undefined + let originalFilePath: string | undefined + + if(saveOriginalFileIfRequired) { + originalFilePath = join( + getTmpFilesDirectory(), + mediaType + generateMessageIDV2() + '-original' + ) + originalFileStream = createWriteStream(originalFilePath) + } let fileLength = 0 const aes = Crypto.createCipheriv('aes-256-cbc', cipherKey, iv) const hmac = Crypto.createHmac('sha256', macKey!).update(iv) const sha256Plain = Crypto.createHash('sha256') - const sha256Enc = Crypto.createHash('sha256') - - const onChunk = (buff: Buffer) => { - sha256Enc.update(buff); - hmac.update(buff); - encFileWriteStream.write(buff); - }; + const sha256Enc = Crypto.createHash('sha256') + + const onChunk = (buff: Buffer) => { + sha256Enc.update(buff) + hmac.update(buff) + encFileWriteStream.write(buff) + } try { for await (const data of stream) { @@ -390,13 +390,13 @@ export const encryptedStream = async( data: { media, type } } ) - } - - if (originalFileStream) { - if (!originalFileStream.write(data)) { - await once(originalFileStream, "drain"); - } - } + } + + if(originalFileStream) { + if(!originalFileStream.write(data)) { + await once(originalFileStream, 'drain') + } + } sha256Plain.update(data) onChunk(aes.update(data)) @@ -410,22 +410,22 @@ export const encryptedStream = async( const fileSha256 = sha256Plain.digest() const fileEncSha256 = sha256Enc.digest() - encFileWriteStream.write(mac); + encFileWriteStream.write(mac) - encFileWriteStream.end(); - originalFileStream?.end?.(); - stream.destroy(); + encFileWriteStream.end() + originalFileStream?.end?.() + stream.destroy() logger?.debug('encrypted data successfully') return { mediaKey, - originalFilePath, - encFilePath, + originalFilePath, + encFilePath, mac, fileEncSha256, fileSha256, - fileLength + fileLength } } catch(error) { // destroy all streams with error @@ -437,13 +437,16 @@ export const encryptedStream = async( sha256Enc.destroy() stream.destroy() - - try { - await fs.unlink(encFilePath) - if (originalFilePath) await fs.unlink(originalFilePath) - } catch(err) { - logger?.error({ err }, 'failed deleting tmp files') - } + + try { + await fs.unlink(encFilePath) + if(originalFilePath) { + await fs.unlink(originalFilePath) + } + } catch(err) { + logger?.error({ err }, 'failed deleting tmp files') + } + throw error } } @@ -625,8 +628,8 @@ export const getWAUploadToServer = ( url, stream, { - ...options, - maxRedirects: 0, + ...options, + maxRedirects: 0, headers: { ...options.headers || { }, 'Content-Type': 'application/octet-stream', From 8cc8b44724fd28d0e4c72ab81ed1451b4650ee24 Mon Sep 17 00:00:00 2001 From: canove Date: Mon, 5 May 2025 21:33:35 -0300 Subject: [PATCH 3/5] chore: lint --- src/Utils/messages.ts | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index 335933d..510d3dc 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -155,14 +155,14 @@ export const prepareWAMessageMedia = async( (typeof uploadData['jpegThumbnail'] === 'undefined') const requiresWaveformProcessing = mediaType === 'audio' && uploadData.ptt === true const requiresAudioBackground = options.backgroundColor && mediaType === 'audio' && uploadData.ptt === true - const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation + const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation const { mediaKey, encFilePath, originalFilePath, fileEncSha256, fileSha256, - fileLength + fileLength } = await encryptedStream( uploadData.media, options.mediaTypeOverride || mediaType, @@ -221,13 +221,16 @@ export const prepareWAMessageMedia = async( ]) .finally( async() => { - try { - await fs.unlink(encFilePath) - if (originalFilePath) await fs.unlink(originalFilePath) - logger?.debug('removed tmp files') - } catch(error) { - logger?.warn('failed to remove tmp file') - } + try { + await fs.unlink(encFilePath) + if(originalFilePath) { + await fs.unlink(originalFilePath) + } + + logger?.debug('removed tmp files') + } catch(error) { + logger?.warn('failed to remove tmp file') + } } ) From f58a38fde96297858f372b5abdbb75bc92c438db Mon Sep 17 00:00:00 2001 From: canove Date: Tue, 6 May 2025 08:06:05 -0300 Subject: [PATCH 4/5] fix: allow media upload retries --- src/Types/Message.ts | 2 +- src/Utils/business.ts | 18 +++++++++++++++--- src/Utils/messages-media.ts | 4 ++-- src/Utils/messages.ts | 2 +- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/Types/Message.ts b/src/Types/Message.ts index b2f6f5f..f0623b6 100644 --- a/src/Types/Message.ts +++ b/src/Types/Message.ts @@ -236,7 +236,7 @@ export type MessageGenerationOptionsFromContent = MiscMessageGenerationOptions & userJid: string } -export type WAMediaUploadFunction = (readStream: Readable, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }> +export type WAMediaUploadFunction = (encFilePath: string, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }> export type MediaGenerationOptions = { logger?: ILogger diff --git a/src/Utils/business.ts b/src/Utils/business.ts index 57459a5..5c0bb2b 100644 --- a/src/Utils/business.ts +++ b/src/Utils/business.ts @@ -1,7 +1,11 @@ import { Boom } from '@hapi/boom' import { createHash } from 'crypto' +import { createWriteStream, promises as fs } from 'fs' +import { tmpdir } from 'os' +import { join } from 'path' import { CatalogCollection, CatalogStatus, OrderDetails, OrderProduct, Product, ProductCreate, ProductUpdate, WAMediaUpload, WAMediaUploadFunction } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, getBinaryNodeChildString } from '../WABinary' +import { generateMessageIDV2 } from './generics' import { getStream, getUrlFromDirectPath, toReadable } from './messages-media' export const parseCatalogNode = (node: BinaryNode) => { @@ -235,22 +239,30 @@ export const uploadingNecessaryImages = async( const { stream } = await getStream(img) const hasher = createHash('sha256') - const contentBlocks: Buffer[] = [] + + const filePath = join(tmpdir(), 'img' + generateMessageIDV2()) + const encFileWriteStream = createWriteStream(filePath) + for await (const block of stream) { hasher.update(block) - contentBlocks.push(block) + encFileWriteStream.write(block) } const sha = hasher.digest('base64') const { directPath } = await waUploadToServer( - toReadable(Buffer.concat(contentBlocks)), + filePath, { mediaType: 'product-catalog-image', fileEncSha256B64: sha, timeoutMs } ) + + await fs + .unlink(filePath) + .catch(err => console.log('Error deleting temp file ', err)) + return { url: getUrlFromDirectPath(directPath) } } ) diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index ea04080..7dd30cb 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -606,7 +606,7 @@ export const getWAUploadToServer = ( { customUploadHosts, fetchAgent, logger, options }: SocketConfig, refreshMediaConn: (force: boolean) => Promise, ): WAMediaUploadFunction => { - return async(stream, { mediaType, fileEncSha256B64, timeoutMs }) => { + return async(filePath, { mediaType, fileEncSha256B64, timeoutMs }) => { // send a query JSON to obtain the url & auth token to upload our media let uploadInfo = await refreshMediaConn(false) @@ -626,7 +626,7 @@ export const getWAUploadToServer = ( const body = await axios.post( url, - stream, + createReadStream(filePath), { ...options, maxRedirects: 0, diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index 510d3dc..f305350 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -177,7 +177,7 @@ export const prepareWAMessageMedia = async( const [{ mediaUrl, directPath }] = await Promise.all([ (async() => { const result = await options.upload( - createReadStream(encFilePath), + encFilePath, { fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs } ) logger?.debug({ mediaType, cacheableKey }, 'uploaded media') From fb83e4799daed870e1bab05ffd4e3471b4e2323d Mon Sep 17 00:00:00 2001 From: canove Date: Tue, 13 May 2025 18:35:29 -0300 Subject: [PATCH 5/5] chore: remove unused variables --- src/Utils/business.ts | 2 +- src/Utils/messages-media.ts | 2 +- src/Utils/messages.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Utils/business.ts b/src/Utils/business.ts index 5c0bb2b..9cf81ac 100644 --- a/src/Utils/business.ts +++ b/src/Utils/business.ts @@ -6,7 +6,7 @@ import { join } from 'path' import { CatalogCollection, CatalogStatus, OrderDetails, OrderProduct, Product, ProductCreate, ProductUpdate, WAMediaUpload, WAMediaUploadFunction } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildren, getBinaryNodeChildString } from '../WABinary' import { generateMessageIDV2 } from './generics' -import { getStream, getUrlFromDirectPath, toReadable } from './messages-media' +import { getStream, getUrlFromDirectPath } from './messages-media' export const parseCatalogNode = (node: BinaryNode) => { const catalogNode = getBinaryNodeChild(node, 'product_catalog') diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index 7dd30cb..3befdb9 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -11,7 +11,7 @@ import { Readable, Transform } from 'stream' import { URL } from 'url' import { proto } from '../../WAProto' import { DEFAULT_ORIGIN, MEDIA_HKDF_KEY_MAPPING, MEDIA_PATH_MAP } from '../Defaults' -import { BaileysEventMap, DownloadableMessage, MediaConnInfo, MediaDecryptionKeyInfo, MediaType, MessageType, SocketConfig, WAGenericMediaMessage, WAMediaPayloadURL, WAMediaUpload, WAMediaUploadFunction, WAMessageContent } from '../Types' +import { BaileysEventMap, DownloadableMessage, MediaConnInfo, MediaDecryptionKeyInfo, MediaType, MessageType, SocketConfig, WAGenericMediaMessage, WAMediaUpload, WAMediaUploadFunction, WAMessageContent } from '../Types' import { BinaryNode, getBinaryNodeChild, getBinaryNodeChildBuffer, jidNormalizedUser } from '../WABinary' import { aesDecryptGCM, aesEncryptGCM, hkdf } from './crypto' import { generateMessageIDV2 } from './generics' diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index f305350..10ea978 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -1,7 +1,7 @@ import { Boom } from '@hapi/boom' import axios from 'axios' import { randomBytes } from 'crypto' -import { createReadStream, promises as fs } from 'fs' +import { promises as fs } from 'fs' import { type Transform } from 'stream' import { proto } from '../../WAProto' import { MEDIA_KEYS, URL_REGEX, WA_DEFAULT_EPHEMERAL } from '../Defaults'