Merge pull request #6 from andresayac/master

added latest pending baileys commit
This commit is contained in:
burstfreeze
2023-04-22 20:01:07 +02:00
committed by GitHub
4 changed files with 165 additions and 40 deletions

View File

@@ -86,39 +86,33 @@ export const addTransactionCapability = (
logger: Logger,
{ maxCommitRetries, delayBetweenTriesMs }: TransactionCapabilityOptions
): SignalKeyStoreWithTransaction => {
let inTransaction = false
// number of queries made to the DB during the transaction
// only there for logging purposes
let dbQueriesInTransaction = 0
let transactionCache: SignalDataSet = { }
let mutations: SignalDataSet = { }
/**
* prefetches some data and stores in memory,
* useful if these data points will be used together often
* */
const prefetch = async<T extends keyof SignalDataTypeMap>(type: T, ids: string[]) => {
const dict = transactionCache[type]
const idsRequiringFetch = dict
? ids.filter(item => typeof dict[item] !== 'undefined')
: ids
// only fetch if there are any items to fetch
if(idsRequiringFetch.length) {
dbQueriesInTransaction += 1
const result = await state.get(type, idsRequiringFetch)
transactionCache[type] ||= {}
transactionCache[type] = Object.assign(
transactionCache[type]!,
result
)
}
}
let transactionsInProgress = 0
return {
get: async(type, ids) => {
if(inTransaction) {
await prefetch(type, ids)
if(isInTransaction()) {
const dict = transactionCache[type]
const idsRequiringFetch = dict
? ids.filter(item => typeof dict[item] === 'undefined')
: ids
// only fetch if there are any items to fetch
if(idsRequiringFetch.length) {
dbQueriesInTransaction += 1
const result = await state.get(type, idsRequiringFetch)
transactionCache[type] ||= {}
Object.assign(
transactionCache[type]!,
result
)
}
return ids.reduce(
(dict, id) => {
const value = transactionCache[type]?.[id]
@@ -134,7 +128,7 @@ export const addTransactionCapability = (
}
},
set: data => {
if(inTransaction) {
if(isInTransaction()) {
logger.trace({ types: Object.keys(data) }, 'caching in transaction')
for(const key in data) {
transactionCache[key] = transactionCache[key] || { }
@@ -147,17 +141,18 @@ export const addTransactionCapability = (
return state.set(data)
}
},
isInTransaction: () => inTransaction,
transaction: async(work) => {
// if we're already in a transaction,
// just execute what needs to be executed -- no commit required
if(inTransaction) {
await work()
} else {
isInTransaction,
async transaction(work) {
let result: Awaited<ReturnType<typeof work>>
transactionsInProgress += 1
if(transactionsInProgress === 1) {
logger.trace('entering transaction')
inTransaction = true
try {
await work()
}
try {
result = await work()
// commit if this is the outermost transaction
if(transactionsInProgress === 1) {
if(Object.keys(mutations).length) {
logger.trace('committing transaction')
// retry mechanism to ensure we've some recovery
@@ -177,15 +172,23 @@ export const addTransactionCapability = (
} else {
logger.trace('no mutations in transaction')
}
} finally {
inTransaction = false
}
} finally {
transactionsInProgress -= 1
if(transactionsInProgress === 0) {
transactionCache = { }
mutations = { }
dbQueriesInTransaction = 0
}
}
return result
}
}
function isInTransaction() {
return transactionsInProgress > 0
}
}
export const initAuthCreds = (): AuthenticationCreds => {