Stream uploads + downloads + allow for remote url uploads

- Switch to using got
- Use encryption/decryption streams for speed & lesser memory consumption
- Allow for stream based download & simultaneous upload of media
This commit is contained in:
Adhiraj Singh
2021-01-13 22:48:28 +05:30
parent 500805236a
commit 0344d6336c
19 changed files with 501 additions and 146 deletions

View File

@@ -1,5 +1,5 @@
import {WAConnection as Base} from './5.User'
import {promises as fs} from 'fs'
import {createReadStream, promises as fs} from 'fs'
import {
MessageOptions,
MessageType,
@@ -9,10 +9,11 @@ import {
WALocationMessage,
WAContactMessage,
WATextMessage,
WAMessageContent, WAMetric, WAFlag, WAMessage, BaileysError, WA_MESSAGE_STATUS_TYPE, WAMessageProto, MediaConnInfo, MessageTypeProto, URL_REGEX, WAUrlInfo, WA_DEFAULT_EPHEMERAL
WAMessageContent, WAMetric, WAFlag, WAMessage, BaileysError, WA_MESSAGE_STATUS_TYPE, WAMessageProto, MediaConnInfo, MessageTypeProto, URL_REGEX, WAUrlInfo, WA_DEFAULT_EPHEMERAL, WAMediaUpload
} from './Constants'
import { generateMessageID, sha256, hmacSign, aesEncrypWithIV, randomBytes, generateThumbnail, getMediaKeys, decodeMediaMessageBuffer, extensionForMediaMessage, whatsappID, unixTimestampSeconds, getAudioDuration, newMessagesDB } from './Utils'
import { generateMessageID, extensionForMediaMessage, whatsappID, unixTimestampSeconds, getAudioDuration, newMessagesDB, encryptedStream, decryptMediaMessageBuffer, generateThumbnail } from './Utils'
import { Mutex } from './Mutex'
import { Readable } from 'stream'
export class WAConnection extends Base {
/**
@@ -116,9 +117,7 @@ export class WAConnection extends Base {
return WAMessageProto.Message.fromObject(content)
}
/** Prepare a media message for sending */
async prepareMessageMedia(buffer: Buffer, mediaType: MessageType, options: MessageOptions = {}) {
await this.waitForConnection ()
async prepareMessageMedia(media: WAMediaUpload, mediaType: MessageType, options: MessageOptions = {}) {
if (mediaType === MessageType.document && !options.mimetype) {
throw new Error('mimetype required to send a document')
}
@@ -133,33 +132,31 @@ export class WAConnection extends Base {
isGIF = true
options.mimetype = MimetypeMap[MessageType.video]
}
// generate a media key
const mediaKey = randomBytes(32)
const mediaKeys = getMediaKeys(mediaKey, mediaType)
const enc = aesEncrypWithIV(buffer, mediaKeys.cipherKey, mediaKeys.iv)
const mac = hmacSign(Buffer.concat([mediaKeys.iv, enc]), mediaKeys.macKey).slice(0, 10)
const body = Buffer.concat([enc, mac]) // body is enc + mac
const fileSha256 = sha256(buffer)
const fileEncSha256 = sha256(body)
const {
mediaKey,
encBodyPath,
bodyPath,
fileEncSha256,
fileSha256,
fileLength
} = await encryptedStream(media, mediaType)
// url safe Base64 encode the SHA256 hash of the body
const fileEncSha256B64 = encodeURIComponent(
fileEncSha256
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/\=+$/, '')
)
await generateThumbnail(buffer, mediaType, options)
const fileEncSha256B64 = encodeURIComponent(
fileEncSha256.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/\=+$/, '')
)
await generateThumbnail(bodyPath, mediaType, options)
if (mediaType === MessageType.audio && !options.duration) {
try {
options.duration = await getAudioDuration (buffer)
options.duration = await getAudioDuration(bodyPath)
} catch (error) {
this.logger.debug ({ error }, 'failed to obtain audio duration: ' + error.message)
}
}
// send a query JSON to obtain the url & auth token to upload our media
let json = await this.refreshMediaConn (options.forceNewMediaOptions)
let json = await this.refreshMediaConn(options.forceNewMediaOptions)
let mediaUrl: string
for (let host of json.hosts) {
@@ -167,8 +164,14 @@ export class WAConnection extends Base {
const url = `https://${host.hostname}${MediaPathMap[mediaType]}/${fileEncSha256B64}?auth=${auth}&token=${fileEncSha256B64}`
try {
const urlFetch = await this.fetchRequest(url, 'POST', body, options.uploadAgent, { 'Content-Type': 'application/octet-stream' })
const result = await urlFetch.json()
const {body: responseText} = await this.fetchRequest(
url,
'POST',
createReadStream(encBodyPath),
options.uploadAgent,
{ 'Content-Type': 'application/octet-stream' }
)
const result = JSON.parse(responseText)
mediaUrl = result?.url
if (mediaUrl) break
@@ -178,7 +181,7 @@ export class WAConnection extends Base {
}
} catch (error) {
const isLast = host.hostname === json.hosts[json.hosts.length-1].hostname
this.logger.error (`Error in uploading to ${host.hostname}${isLast ? '' : ', retrying...'}`)
this.logger.error (`Error in uploading to ${host.hostname} (${error}) ${isLast ? '' : ', retrying...'}`)
}
}
if (!mediaUrl) throw new Error('Media upload failed on all hosts')
@@ -191,7 +194,7 @@ export class WAConnection extends Base {
mimetype: options.mimetype,
fileEncSha256: fileEncSha256,
fileSha256: fileSha256,
fileLength: buffer.length,
fileLength: fileLength,
seconds: options.duration,
fileName: options.filename || 'file',
gifPlayback: isGIF || undefined,
@@ -317,24 +320,39 @@ export class WAConnection extends Base {
})
Object.keys (response[1]).forEach (key => content[key] = response[1][key]) // update message
}
async downloadMediaMessage (message: WAMessage): Promise<Buffer>
async downloadMediaMessage (message: WAMessage, type: 'buffer'): Promise<Buffer>
async downloadMediaMessage (message: WAMessage, type: 'stream'): Promise<Readable>
/**
* Securely downloads the media from the message.
* Renews the download url automatically, if necessary.
*/
@Mutex (message => message?.key?.id)
async downloadMediaMessage (message: WAMessage) {
async downloadMediaMessage (message: WAMessage, type: 'buffer' | 'stream' = 'buffer') {
let mContent = message.message?.ephemeralMessage?.message || message.message
if (!mContent) throw new BaileysError('No message present', { status: 400 })
const downloadMediaMessage = async () => {
const stream = await decryptMediaMessageBuffer(mContent)
if(type === 'buffer') {
let buffer = Buffer.from([])
for await(const chunk of stream) {
buffer = Buffer.concat([buffer, chunk])
}
return buffer
}
return stream
}
try {
const buff = await decodeMediaMessageBuffer (mContent, this.fetchRequest)
const buff = await downloadMediaMessage()
return buff
} catch (error) {
if (error instanceof BaileysError && error.status === 404) { // media needs to be updated
this.logger.info (`updating media of message: ${message.key.id}`)
await this.updateMediaMessage (message)
mContent = message.message?.ephemeralMessage?.message || message.message
const buff = await decodeMediaMessageBuffer (mContent, this.fetchRequest)
const buff = await downloadMediaMessage()
return buff
}
throw error
@@ -348,10 +366,11 @@ export class WAConnection extends Base {
* @param attachExtension should the parsed extension be applied automatically to the file
*/
async downloadAndSaveMediaMessage (message: WAMessage, filename: string, attachExtension: boolean=true) {
const buffer = await this.downloadMediaMessage (message)
const extension = extensionForMediaMessage (message.message)
const trueFileName = attachExtension ? (filename + '.' + extension) : filename
await fs.writeFile (trueFileName, buffer)
const buffer = await this.downloadMediaMessage(message)
await fs.writeFile(trueFileName, buffer)
return trueFileName
}
/** Query a string to check if it has a url, if it does, return required extended text message */