fix: dbQueue pouchdb

This commit is contained in:
vaultec 2023-03-21 22:37:51 -07:00
parent 6f7c758378
commit 2c916eb02a
No known key found for this signature in database
GPG Key ID: 5D3801A306E5CB25
2 changed files with 53 additions and 37 deletions

View File

@ -141,14 +141,16 @@ export class EncoderService {
// const tileDoc = await TileDocument.load(this.self.ceramic, streamId)
// const content = tileDoc.content
console.log('updateJob - function', updateData)
await this.pouch.upsert(job_id, (doc) => {
console.log('updateJob - crdt', updateData)
for (let [key, value] of Object.entries(updateData)) {
doc[key] = value
}
doc['updated_at'] = new Date().toISOString()
return doc;
await this.self.gatewayClient.dbQueue.add(async () => {
await this.pouch.upsert(job_id, (doc) => {
console.log('updateJob - crdt', updateData)
for (let [key, value] of Object.entries(updateData)) {
doc[key] = value
}
doc['updated_at'] = new Date().toISOString()
return doc;
})
})
const docNew = await this.pouch.get(job_id)
// for (let [key, value] of Object.entries(updateData)) {
@ -180,10 +182,12 @@ export class EncoderService {
const startTime = new Date();
let download_pct = 0;
const slowUpdate = setInterval(() => {
this.pouch.upsert(jobInfo.id, (doc) => {
doc.download_pct = download_pct;
return doc
})
this.self.gatewayClient.dbQueue.add(async () => [
this.pouch.upsert(jobInfo.id, (doc) => {
doc.download_pct = download_pct;
return doc
})
])
}, 500)
const downloadProcess = execa('wget', [sourceUrl, '-O', Path.join(downloadFolder, `${jobInfo.id}_src.mp4`)], {
// on
@ -205,6 +209,7 @@ export class EncoderService {
}
}
await downloadProcess
clearInterval(slowUpdate)
// console.log(stdout)
// const downloader = new Downloader({
// url: sourceUrl,
@ -294,13 +299,15 @@ export class EncoderService {
let progress1;
const progressInterval = setInterval(() => {
this.pouch.upsert(jobInfo.id, (doc) => {
doc.progress = progress1
if(doc.progressPct !== progressPct1) {
doc.last_updated_diff = new Date()
}
doc.progressPct = progressPct1
return doc
this.self.gatewayClient.dbQueue.add(async () => {
await this.pouch.upsert(jobInfo.id, (doc) => {
doc.progress = progress1
if(doc.progressPct !== progressPct1) {
doc.last_updated_diff = new Date()
}
doc.progressPct = progressPct1
return doc
})
})
}, 500)
@ -522,16 +529,18 @@ export class EncoderService {
// const fakeStreamId = id
await this.pouch.upsert(initialJob.id, (doc) => {
for(let key in initialJob) {
doc[key] = initialJob[key]
}
// doc['streamId'] = tileDocument.id.toString()
doc['streamId'] = id
doc['status'] = EncodeStatus.PENDING
doc['progress'] = 0
return doc
await this.self.gatewayClient.dbQueue.add(async () => {
await this.pouch.upsert(initialJob.id, (doc) => {
for(let key in initialJob) {
doc[key] = initialJob[key]
}
// doc['streamId'] = tileDocument.id.toString()
doc['streamId'] = id
doc['status'] = EncodeStatus.PENDING
doc['progress'] = 0
return doc
})
})
return {
id,

View File

@ -11,6 +11,7 @@ export class GatewayClient {
self: CoreService
apiUrl: string
jobQueue: PQueue
dbQueue: PQueue
activeJobs: Record<string, Object>
constructor(self) {
@ -22,6 +23,8 @@ export class GatewayClient {
this.jobQueue = new PQueue({ concurrency: queue_concurrency })
this.dbQueue = new PQueue({ concurrency: 1 })
this.activeJobs = {}
}
@ -84,11 +87,13 @@ export class GatewayClient {
}),
})
await this.self.encoder.pouch.upsert('pin-allocation', (doc) => {
doc[job_id] = {
cid: jobUpdate.content.outCid
}
return doc;
await this.dbQueue.add(async () => {
await this.self.encoder.pouch.upsert('pin-allocation', (doc) => {
doc[job_id] = {
cid: jobUpdate.content.outCid
}
return doc;
})
})
this.ipfsBootstrap().catch((e) => {
console.log(e)
@ -238,9 +243,11 @@ export class GatewayClient {
} catch {
//If not pinned
}
await this.self.encoder.pouch.upsert('pin-allocation', (doc) => {
delete doc[job_id]
return doc;
await this.dbQueue.add(async () => {
await this.self.encoder.pouch.upsert('pin-allocation', (doc) => {
delete doc[job_id]
return doc;
})
})
}
}