From 38a44be006d4d43c9a283bae026c21746fca5ee6 Mon Sep 17 00:00:00 2001 From: Adhiraj Singh Date: Fri, 17 Dec 2021 18:27:03 +0530 Subject: [PATCH] perf: experimental do not use fs for enc stream --- src/Types/Message.ts | 2 +- src/Utils/messages-media.ts | 80 +++++++++++++++++++++++-------------- src/Utils/messages.ts | 49 +++++++++++++---------- 3 files changed, 78 insertions(+), 53 deletions(-) diff --git a/src/Types/Message.ts b/src/Types/Message.ts index 8ae9454..2162545 100644 --- a/src/Types/Message.ts +++ b/src/Types/Message.ts @@ -140,7 +140,7 @@ export type MessageGenerationOptionsFromContent = MiscMessageGenerationOptions & userJid: string } -export type WAMediaUploadFunction = (readStream: ReadStream, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }> +export type WAMediaUploadFunction = (readStream: Readable, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }> export type MediaGenerationOptions = { logger?: Logger diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index af6b227..ee3ed5c 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -4,7 +4,7 @@ import type { Options, Response } from 'got' import { Boom } from '@hapi/boom' import * as Crypto from 'crypto' import { Readable, Transform } from 'stream' -import { createReadStream, createWriteStream, promises as fs, WriteStream } from 'fs' +import { createReadStream, createWriteStream, promises as fs, ReadStream, WriteStream } from 'fs' import { exec } from 'child_process' import { tmpdir } from 'os' import { URL } from 'url' @@ -219,14 +219,20 @@ export const getGotStream = async(url: string | URL, options: Options & { isStre }) return fetched } -export const encryptedStream = async(media: WAMediaUpload, mediaType: MediaType, saveOriginalFileIfRequired = true) => { +export const encryptedStream = async( + media: WAMediaUpload, + mediaType: MediaType, + saveOriginalFileIfRequired = true +) => { const { stream, type } = await getStream(media) const mediaKey = Crypto.randomBytes(32) const {cipherKey, iv, macKey} = getMediaKeys(mediaKey, mediaType) // random name - const encBodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageID() + '.enc') - const encWriteStream = createWriteStream(encBodyPath) + //const encBodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageID() + '.enc') + // const encWriteStream = createWriteStream(encBodyPath) + const encWriteStream = new Readable({ read: () => {} }) + let bodyPath: string let writeStream: WriteStream let didSaveToTmpPath = false @@ -247,38 +253,50 @@ export const encryptedStream = async(media: WAMediaUpload, mediaType: MediaType, const onChunk = (buff: Buffer) => { sha256Enc = sha256Enc.update(buff) hmac = hmac.update(buff) - encWriteStream.write(buff) + encWriteStream.push(buff) } - for await(const data of stream) { - fileLength += data.length - sha256Plain = sha256Plain.update(data) - if(writeStream) { - if(!writeStream.write(data)) await once(writeStream, 'drain') + + try { + for await(const data of stream) { + fileLength += data.length + sha256Plain = sha256Plain.update(data) + if(writeStream) { + if(!writeStream.write(data)) await once(writeStream, 'drain') + } + onChunk(aes.update(data)) } - onChunk(aes.update(data)) - } - onChunk(aes.final()) - - const mac = hmac.digest().slice(0, 10) - sha256Enc = sha256Enc.update(mac) + onChunk(aes.final()) - const fileSha256 = sha256Plain.digest() - const fileEncSha256 = sha256Enc.digest() + const mac = hmac.digest().slice(0, 10) + sha256Enc = sha256Enc.update(mac) + + const fileSha256 = sha256Plain.digest() + const fileEncSha256 = sha256Enc.digest() + + encWriteStream.push(mac) + encWriteStream.push(null) - encWriteStream.write(mac) - encWriteStream.end() + writeStream && writeStream.end() + + return { + mediaKey, + encWriteStream, + bodyPath, + mac, + fileEncSha256, + fileSha256, + fileLength, + didSaveToTmpPath + } + } catch(error) { + encWriteStream.destroy(error) + writeStream.destroy(error) + aes.destroy(error) + hmac.destroy(error) + sha256Plain.destroy(error) + sha256Enc.destroy(error) - writeStream && writeStream.end() - - return { - mediaKey, - encBodyPath, - bodyPath, - mac, - fileEncSha256, - fileSha256, - fileLength, - didSaveToTmpPath + throw error } } diff --git a/src/Utils/messages.ts b/src/Utils/messages.ts index 9238159..47ab76a 100644 --- a/src/Utils/messages.ts +++ b/src/Utils/messages.ts @@ -94,7 +94,7 @@ export const prepareWAMessageMedia = async( const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation const { mediaKey, - encBodyPath, + encWriteStream, bodyPath, fileEncSha256, fileSha256, @@ -108,28 +108,35 @@ export const prepareWAMessageMedia = async( .replace(/\//g, '_') .replace(/\=+$/, '') ) - try { - if(requiresThumbnailComputation) { - uploadData.jpegThumbnail = await generateThumbnail(bodyPath, mediaType as any, options) + + const [{ mediaUrl, directPath }] = await Promise.all([ + (() => { + return options.upload( + encWriteStream, + { fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs } + ) + })(), + (async() => { + try { + if(requiresThumbnailComputation) { + uploadData.jpegThumbnail = await generateThumbnail(bodyPath, mediaType as any, options) + } + if (requiresDurationComputation) { + uploadData.seconds = await getAudioDuration(bodyPath) + } + } catch (error) { + options.logger?.info({ trace: error.stack }, 'failed to obtain extra info') + } + })(), + ]) + .finally( + async() => { + encWriteStream.destroy() + // remove tmp files + didSaveToTmpPath && bodyPath && await fs.unlink(bodyPath) } - if (requiresDurationComputation) { - uploadData.seconds = await getAudioDuration(bodyPath) - } - } catch (error) { - options.logger?.info({ trace: error.stack }, 'failed to obtain extra info') - } - const {mediaUrl, directPath} = await options.upload( - createReadStream(encBodyPath), - { fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs } - ) - // remove tmp files - await Promise.all( - [ - fs.unlink(encBodyPath), - didSaveToTmpPath && bodyPath && fs.unlink(bodyPath) - ] - .filter(Boolean) ) + delete uploadData.media const obj = WAProto.Message.fromObject({