From e0e7d40847e8c01bbf18680bfba1682293168d5e Mon Sep 17 00:00:00 2001 From: BochilGaming <79433517+BochilGaming@users.noreply.github.com> Date: Sun, 9 Jul 2023 04:43:32 +0700 Subject: [PATCH] feat: Add queue in enc/dec group message (#191) --- WASignalGroup/group_cipher.js | 62 ++++++++++++++++----------- WASignalGroup/queue_job.js | 69 ++++++++++++++++++++++++++++++ WASignalGroup/sender_key_record.js | 6 ++- 3 files changed, 111 insertions(+), 26 deletions(-) create mode 100644 WASignalGroup/queue_job.js diff --git a/WASignalGroup/group_cipher.js b/WASignalGroup/group_cipher.js index 3c126f4..2733fa8 100644 --- a/WASignalGroup/group_cipher.js +++ b/WASignalGroup/group_cipher.js @@ -1,3 +1,4 @@ +const queue_job = require('./queue_job'); const SenderKeyMessage = require('./sender_key_message'); const crypto = require('libsignal/src/crypto'); @@ -7,11 +8,22 @@ class GroupCipher { this.senderKeyName = senderKeyName; } + queueJob(awaitable) { + return queue_job(this.senderKeyName.toString(), awaitable) + } + async encrypt(paddedPlaintext) { - try { + return await this.queueJob(async () => { const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName); + if (!record) { + throw new Error("No SenderKeyRecord found for encryption") + } const senderKeyState = record.getSenderKeyState(); - const senderKey = senderKeyState.getSenderChainKey().getSenderMessageKey(); + if (!senderKeyState) { + throw new Error("No session to encrypt message"); + } + const iteration = senderKeyState.getSenderChainKey().getIteration() + const senderKey = this.getSenderKey(senderKeyState, iteration === 0 ? 0 : iteration + 1) const ciphertext = await this.getCipherText( senderKey.getIv(), @@ -25,35 +37,37 @@ class GroupCipher { ciphertext, senderKeyState.getSigningKeyPrivate() ); - senderKeyState.setSenderChainKey(senderKeyState.getSenderChainKey().getNext()); await this.senderKeyStore.storeSenderKey(this.senderKeyName, record); - return senderKeyMessage.serialize(); - } catch (e) { - //console.log(e.stack); - throw new Error('NoSessionException'); - } + return senderKeyMessage.serialize() + }) } async decrypt(senderKeyMessageBytes) { - const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName); - if (!record) throw new Error(`No sender key for: ${this.senderKeyName}`); + return await this.queueJob(async () => { + const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName); + if (!record) { + throw new Error("No SenderKeyRecord found for decryption") + } + const senderKeyMessage = new SenderKeyMessage(null, null, null, null, senderKeyMessageBytes); + const senderKeyState = record.getSenderKeyState(senderKeyMessage.getKeyId()); + if (!senderKeyState) { + throw new Error("No session found to decrypt message") + } - const senderKeyMessage = new SenderKeyMessage(null, null, null, null, senderKeyMessageBytes); + senderKeyMessage.verifySignature(senderKeyState.getSigningKeyPublic()); + const senderKey = this.getSenderKey(senderKeyState, senderKeyMessage.getIteration()); + // senderKeyState.senderKeyStateStructure.senderSigningKey.private = - const senderKeyState = record.getSenderKeyState(senderKeyMessage.getKeyId()); - senderKeyMessage.verifySignature(senderKeyState.getSigningKeyPublic()); - const senderKey = this.getSenderKey(senderKeyState, senderKeyMessage.getIteration()); - // senderKeyState.senderKeyStateStructure.senderSigningKey.private = + const plaintext = await this.getPlainText( + senderKey.getIv(), + senderKey.getCipherKey(), + senderKeyMessage.getCipherText() + ); - const plaintext = await this.getPlainText( - senderKey.getIv(), - senderKey.getCipherKey(), - senderKeyMessage.getCipherText() - ); + await this.senderKeyStore.storeSenderKey(this.senderKeyName, record); - await this.senderKeyStore.storeSenderKey(this.senderKeyName, record); - - return plaintext; + return plaintext; + }) } getSenderKey(senderKeyState, iteration) { @@ -67,7 +81,7 @@ class GroupCipher { ); } - if (senderChainKey.getIteration() - iteration > 2000) { + if (iteration - senderChainKey.getIteration() > 2000) { throw new Error('Over 2000 messages into the future!'); } diff --git a/WASignalGroup/queue_job.js b/WASignalGroup/queue_job.js new file mode 100644 index 0000000..df0c324 --- /dev/null +++ b/WASignalGroup/queue_job.js @@ -0,0 +1,69 @@ +// vim: ts=4:sw=4:expandtab + +/* + * jobQueue manages multiple queues indexed by device to serialize + * session io ops on the database. + */ +'use strict'; + + +const _queueAsyncBuckets = new Map(); +const _gcLimit = 10000; + +async function _asyncQueueExecutor(queue, cleanup) { + let offt = 0; + while (true) { + let limit = Math.min(queue.length, _gcLimit); // Break up thundering hurds for GC duty. + for (let i = offt; i < limit; i++) { + const job = queue[i]; + try { + job.resolve(await job.awaitable()); + } catch (e) { + job.reject(e); + } + } + if (limit < queue.length) { + /* Perform lazy GC of queue for faster iteration. */ + if (limit >= _gcLimit) { + queue.splice(0, limit); + offt = 0; + } else { + offt = limit; + } + } else { + break; + } + } + cleanup(); +} + +module.exports = function (bucket, awaitable) { + /* Run the async awaitable only when all other async calls registered + * here have completed (or thrown). The bucket argument is a hashable + * key representing the task queue to use. */ + if (!awaitable.name) { + // Make debuging easier by adding a name to this function. + Object.defineProperty(awaitable, 'name', { writable: true }); + if (typeof bucket === 'string') { + awaitable.name = bucket; + } else { + console.warn("Unhandled bucket type (for naming):", typeof bucket, bucket); + } + } + let inactive; + if (!_queueAsyncBuckets.has(bucket)) { + _queueAsyncBuckets.set(bucket, []); + inactive = true; + } + const queue = _queueAsyncBuckets.get(bucket); + const job = new Promise((resolve, reject) => queue.push({ + awaitable, + resolve, + reject + })); + if (inactive) { + /* An executor is not currently active; Start one now. */ + _asyncQueueExecutor(queue, () => _queueAsyncBuckets.delete(bucket)); + } + return job; +}; \ No newline at end of file diff --git a/WASignalGroup/sender_key_record.js b/WASignalGroup/sender_key_record.js index e17f290..8112efa 100644 --- a/WASignalGroup/sender_key_record.js +++ b/WASignalGroup/sender_key_record.js @@ -22,18 +22,20 @@ class SenderKeyRecord { } getSenderKeyState(keyId) { - if (!keyId && this.senderKeyStates.length) return this.senderKeyStates[0]; + if (!keyId && this.senderKeyStates.length) return this.senderKeyStates[this.senderKeyStates.length - 1]; for (let i = 0; i < this.senderKeyStates.length; i++) { const state = this.senderKeyStates[i]; if (state.getKeyId() === keyId) { return state; } } - throw new Error(`No keys for: ${keyId}`); } addSenderKeyState(id, iteration, chainKey, signatureKey) { this.senderKeyStates.push(new SenderKeyState(id, iteration, chainKey, null, signatureKey)); + if (this.senderKeyStates.length > 5) { + this.senderKeyStates.shift() + } } setSenderKeyState(id, iteration, chainKey, keyPair) {