perf: experimental do not use fs for enc stream

This commit is contained in:
Adhiraj Singh
2021-12-17 18:27:03 +05:30
parent e51bbc4893
commit 38a44be006
3 changed files with 78 additions and 53 deletions

View File

@@ -140,7 +140,7 @@ export type MessageGenerationOptionsFromContent = MiscMessageGenerationOptions &
userJid: string
}
export type WAMediaUploadFunction = (readStream: ReadStream, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }>
export type WAMediaUploadFunction = (readStream: Readable, opts: { fileEncSha256B64: string, mediaType: MediaType, timeoutMs?: number }) => Promise<{ mediaUrl: string, directPath: string }>
export type MediaGenerationOptions = {
logger?: Logger

View File

@@ -4,7 +4,7 @@ import type { Options, Response } from 'got'
import { Boom } from '@hapi/boom'
import * as Crypto from 'crypto'
import { Readable, Transform } from 'stream'
import { createReadStream, createWriteStream, promises as fs, WriteStream } from 'fs'
import { createReadStream, createWriteStream, promises as fs, ReadStream, WriteStream } from 'fs'
import { exec } from 'child_process'
import { tmpdir } from 'os'
import { URL } from 'url'
@@ -219,14 +219,20 @@ export const getGotStream = async(url: string | URL, options: Options & { isStre
})
return fetched
}
export const encryptedStream = async(media: WAMediaUpload, mediaType: MediaType, saveOriginalFileIfRequired = true) => {
export const encryptedStream = async(
media: WAMediaUpload,
mediaType: MediaType,
saveOriginalFileIfRequired = true
) => {
const { stream, type } = await getStream(media)
const mediaKey = Crypto.randomBytes(32)
const {cipherKey, iv, macKey} = getMediaKeys(mediaKey, mediaType)
// random name
const encBodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageID() + '.enc')
const encWriteStream = createWriteStream(encBodyPath)
//const encBodyPath = join(getTmpFilesDirectory(), mediaType + generateMessageID() + '.enc')
// const encWriteStream = createWriteStream(encBodyPath)
const encWriteStream = new Readable({ read: () => {} })
let bodyPath: string
let writeStream: WriteStream
let didSaveToTmpPath = false
@@ -247,38 +253,50 @@ export const encryptedStream = async(media: WAMediaUpload, mediaType: MediaType,
const onChunk = (buff: Buffer) => {
sha256Enc = sha256Enc.update(buff)
hmac = hmac.update(buff)
encWriteStream.write(buff)
encWriteStream.push(buff)
}
for await(const data of stream) {
fileLength += data.length
sha256Plain = sha256Plain.update(data)
if(writeStream) {
if(!writeStream.write(data)) await once(writeStream, 'drain')
try {
for await(const data of stream) {
fileLength += data.length
sha256Plain = sha256Plain.update(data)
if(writeStream) {
if(!writeStream.write(data)) await once(writeStream, 'drain')
}
onChunk(aes.update(data))
}
onChunk(aes.update(data))
}
onChunk(aes.final())
const mac = hmac.digest().slice(0, 10)
sha256Enc = sha256Enc.update(mac)
onChunk(aes.final())
const fileSha256 = sha256Plain.digest()
const fileEncSha256 = sha256Enc.digest()
const mac = hmac.digest().slice(0, 10)
sha256Enc = sha256Enc.update(mac)
const fileSha256 = sha256Plain.digest()
const fileEncSha256 = sha256Enc.digest()
encWriteStream.push(mac)
encWriteStream.push(null)
encWriteStream.write(mac)
encWriteStream.end()
writeStream && writeStream.end()
return {
mediaKey,
encWriteStream,
bodyPath,
mac,
fileEncSha256,
fileSha256,
fileLength,
didSaveToTmpPath
}
} catch(error) {
encWriteStream.destroy(error)
writeStream.destroy(error)
aes.destroy(error)
hmac.destroy(error)
sha256Plain.destroy(error)
sha256Enc.destroy(error)
writeStream && writeStream.end()
return {
mediaKey,
encBodyPath,
bodyPath,
mac,
fileEncSha256,
fileSha256,
fileLength,
didSaveToTmpPath
throw error
}
}

View File

@@ -94,7 +94,7 @@ export const prepareWAMessageMedia = async(
const requiresOriginalForSomeProcessing = requiresDurationComputation || requiresThumbnailComputation
const {
mediaKey,
encBodyPath,
encWriteStream,
bodyPath,
fileEncSha256,
fileSha256,
@@ -108,28 +108,35 @@ export const prepareWAMessageMedia = async(
.replace(/\//g, '_')
.replace(/\=+$/, '')
)
try {
if(requiresThumbnailComputation) {
uploadData.jpegThumbnail = await generateThumbnail(bodyPath, mediaType as any, options)
const [{ mediaUrl, directPath }] = await Promise.all([
(() => {
return options.upload(
encWriteStream,
{ fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs }
)
})(),
(async() => {
try {
if(requiresThumbnailComputation) {
uploadData.jpegThumbnail = await generateThumbnail(bodyPath, mediaType as any, options)
}
if (requiresDurationComputation) {
uploadData.seconds = await getAudioDuration(bodyPath)
}
} catch (error) {
options.logger?.info({ trace: error.stack }, 'failed to obtain extra info')
}
})(),
])
.finally(
async() => {
encWriteStream.destroy()
// remove tmp files
didSaveToTmpPath && bodyPath && await fs.unlink(bodyPath)
}
if (requiresDurationComputation) {
uploadData.seconds = await getAudioDuration(bodyPath)
}
} catch (error) {
options.logger?.info({ trace: error.stack }, 'failed to obtain extra info')
}
const {mediaUrl, directPath} = await options.upload(
createReadStream(encBodyPath),
{ fileEncSha256B64, mediaType, timeoutMs: options.mediaUploadTimeoutMs }
)
// remove tmp files
await Promise.all(
[
fs.unlink(encBodyPath),
didSaveToTmpPath && bodyPath && fs.unlink(bodyPath)
]
.filter(Boolean)
)
delete uploadData.media
const obj = WAProto.Message.fromObject({