feat: first working version of encoder daemon

This commit is contained in:
vaultec 2021-10-24 22:47:53 -07:00
parent 5bc6ebb1b6
commit 5c80a36fb1
No known key found for this signature in database
GPG Key ID: 5D3801A306E5CB25
6 changed files with 235 additions and 47 deletions

View File

@ -7,11 +7,18 @@ import MulticastDNS from 'libp2p-mdns'
import Bootstrap from 'libp2p-bootstrap'
import DHT from 'libp2p-kad-dht'
import PeerId from 'peer-id'
import { CeramicClient } from '@ceramicnetwork/http-client'
import Pushable from 'it-pushable'
import {pipe} from 'it-pipe'
import {encode} from './src/modules/libp2p/frame-codec.util'
import {decode, encode} from './src/modules/libp2p/frame-codec.util'
import { MESSAGE_TYPES } from './src/modules/libp2p/messages.model'
import { TileDocument } from '@ceramicnetwork/stream-tile'
import { EncodeStatus } from './src/modules/encoder.model'
import logUpdate from 'log-update';
import cli from 'cli-ux'
const ClientKey =
{
@ -22,8 +29,10 @@ const ClientKey =
void (async () => {
console.log('P2P interface starting up')
const ceramic = new CeramicClient('https://ceramic-clay.3boxlabs.com') //Using the public node for now.
const idListener = await PeerId.createFromJSON(ClientKey)
const libp2p = await Libp2p.create({
peerId: idListener,
@ -35,7 +44,7 @@ void (async () => {
dht: DHT
},
addresses: {
listen: ['/ip4/10.0.1.188/tcp/1446']
listen: ['/ip4/10.0.1.188/tcp/14446']
},
config: {
peerDiscovery: {
@ -73,27 +82,58 @@ void (async () => {
await libp2p.start()
console.log(libp2p.peerId.toJSON())
setInterval(() => {
console.log(libp2p.connections.size)
//console.log(libp2p.connections.size)
}, 5000)
const handler = ({ connection, stream, protocol }) => {
const handler = async ({ connection, stream, protocol }) => {
// use stream or connection according to the needs
console.log(connection, stream, protocol)
for await(let item of stream.source) {
console.log(item)
}
}
libp2p.handle('/spk-video-encoder/1.0.0', handler)
const output = await libp2p.dialProtocol("/ip4/10.0.1.188/tcp/1445/p2p/QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq", '/spk-video-encoder/1.0.0')
const output = await libp2p.dialProtocol("/ip4/10.0.1.188/tcp/14445/p2p/QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq", '/spk-video-encoder/1.0.0')
console.log(output)
cli.action.start('Encoding Video')
// do some action...
// stop the spinner
//cli.action.stop() // shows 'starting a process... done'
void (async () => {
let encodeId;
void (async () => {
for await(let item of output.stream.source) {
const decodedMessage = decode(item._bufs[0])
console.log(decodedMessage)
if(decodedMessage.type === MESSAGE_TYPES.RESPONE_ENCODE_JOB) {
encodeId = decodedMessage.streamId
pushable.push(encode({
type: MESSAGE_TYPES.SUBSCRIBE_UPDATE,
streamId: decodedMessage.streamId
}))
let encodeDoc = await TileDocument.load(ceramic, encodeId)
setInterval(async () => {
await encodeDoc.sync()
const contentData = encodeDoc.content as any;
if(contentData.status === EncodeStatus.COMPLETE) {
console.log(`Job complete, IPFS Hash is ${contentData.outCid}`)
cli.action.stop()
process.exit(0)
}
}, 1000)
}
}})()
})()
const pushable = Pushable()
pipe(pushable, output.stream)
pushable.push(encode({
message: "amazing work! Partner! "
type: MESSAGE_TYPES.REQUEST_ENCODE_JOB,
ipfsHash: 'Qma9ZjjtH7fdLWSrMU43tFihvSN59aQdes7f5KW6vGGk6e'
}))
pushable.push(encode({
message: "amazing work! Partner! "
}))
pushable.push(encode({
message: "amazing work! Partner! "
}))
pushable.end()
console.log(output.stream.close())
//pushable.end()
//console.log(output.stream.close())
})()

View File

@ -10,9 +10,14 @@
"@ceramicnetwork/streamid": "^1.3.0",
"@ceramicstudio/idx": "^0.12.2",
"@ipld/dag-cbor": "^6.0.12",
"@nestjs/common": "^8.1.1",
"3id-did-provider": "^1.1.1",
"ajv": "^8.6.3",
"cli-ux": "^5.6.3",
"datastore-fs": "^6.0.1",
"debug": "^4.3.2",
"dids": "^2.4.0",
"dlv": "^1.1.3",
"fluent-ffmpeg": "^2.1.2",
"ipfs-http-client": "^47.0.1",
"it-pipe": "^1.1.0",
@ -30,7 +35,8 @@
"pouchdb-find": "^7.2.2",
"pouchdb-upsert": "^2.2.0",
"tmp": "^0.2.1",
"uuid": "^8.3.2"
"uuid": "^8.3.2",
"yargs": "^17.2.1"
},
"devDependencies": {
"@types/jest": "^26.0.24",

View File

@ -11,6 +11,7 @@ import EventEmitter from 'events'
import PouchDB from 'pouchdb'
import PouchdbFind from 'pouchdb-find'
import PouchdbUpsert from 'pouchdb-upsert'
import { EncodeStatus } from '../encoder.model';
PouchDB.plugin(PouchdbFind);
PouchDB.plugin(PouchdbUpsert);
@ -39,15 +40,7 @@ interface EncodeInput {
type: string
url: string
}
enum EncodeStatus {
PENDING = 'pending',
QUEUED = 'queued',
LOADING = 'loading',
RUNNING = 'running',
FAILED = 'failed',
UPLOADING = 'uploading',
COMPLETE = 'complete',
}
const MAX_BIT_RATE = {
'1080': '2760k',
'720': '1327k',
@ -126,9 +119,16 @@ const tutils = {
export class EncoderService {
self: CoreService
pouch: PouchDB
events: EventEmitter
updateSubscriptions: Set<Object>
constructor(self) {
this.self = self;
this.pouch = new PouchDB("encoder.db");
this.events = new EventEmitter()
this.updateSubscriptions = new Set()
}
async updateJob(streamId, updateData ) {
@ -173,6 +173,7 @@ export class EncoderService {
status: EncodeStatus.RUNNING
})
console.log(jobInfo)
let stage = 0;
let sizes = []
let codecData;
let duration;
@ -181,12 +182,19 @@ export class EncoderService {
sizes.push(profile.size);
ret.size(profile.size);
ret.on('progress', ((progress) => {
//this.events.emit("progress", jobInfo.id, progress)
console.log(progress)
this.pouch.upsert(jobInfo.id, doc => {
console.log(jobInfo)
for(let key in progress) {
progress[key] = progress[key] || 0
}
this.events.emit("job.progress", streamId.toString(), {
stage,
stages: jobInfo.profiles.length,
progress
})
/*this.pouch.upsert(jobInfo.id, doc => {
doc.progress = progress
return doc
})
})*/
}).bind(this))
ret.on('end', () => {
//this.events.emit('done', jobInfo.id)
@ -203,19 +211,21 @@ export class EncoderService {
duration = codecData.duration;
})
})
ret.videoBitrate(MAX_BIT_RATE[String(profile.size.split('x')[1])])
fs.mkdirSync(Path.join(workfolder, `${String(profile.size.split('x')[1])}p`))
//ret.save(path.join(workfolder, `${String(size.split('x')[1])}p`, 'index.m3u8'))
console.log(Path.join(workfolder, `${String(profile.size.split('x')[1])}p`, 'index.m3u8'))
ret.addOption(`-segment_format`, "mpegts")
ret.addOption('-segment_list', Path.join(workfolder, `${String(profile.size.split('x')[1])}p`, 'index.m3u8'))
ret.save(Path.join(workfolder + '/' + `${String(profile.size.split('x')[1])}p`, `${String(profile.size.split('x')[1])}p_%d.ts`))
await promy;
this.pouch.upsert(jobInfo.id, (doc) => {
})
stage = stage + 1;
}
var manifest = this._generateManifest(codecData, sizes)
fs.writeFileSync(Path.join(workfolder, "manifest.m3u8"), manifest)
@ -226,14 +236,18 @@ export class EncoderService {
})
const ipfsHash = await this.self.ipfs.add(globSource(workfolder, {recursive: true} ), {pin:false})
fs.unlink(workfolder, () => {})
console.log(ipfsHash)
this.updateJob(streamId, {
status: EncodeStatus.COMPLETE
status: EncodeStatus.COMPLETE,
outCid: ipfsHash.cid.toString(),
total_size: ipfsHash.size
})
return ipfsHash.cid.toString();
} catch {
} catch (ex) {
this.updateJob(streamId, {
status: EncodeStatus.FAILED
})
fs.unlink(workfolder, () => {})
}
}
async executeJob(jobInfoOrId: Object|string) {
@ -252,6 +266,10 @@ export class EncoderService {
} else {
throw new Error('Invalid input')
}
this.events.on('job.progress', (e, b) => {
console.log('receiving progress here 100%')
console.log(e,b)
})
const out = await this.executeJobRaw(jobInfo, streamId)
console.log(out)
@ -279,7 +297,7 @@ export class EncoderService {
async cleanUpJob() {
}
async createJob(sourceCID, peerId, client_id) {
async createJob(sourceCID, peerId, client_id?) {
const id = uuid();
const tileDocument = await TileDocument.create(this.self.ceramic, {
id,
@ -327,6 +345,8 @@ export class EncoderService {
}
}
async start() {
/*const data = await this.createJob('Qma9ZjjtH7fdLWSrMU43tFihvSN59aQdes7f5KW6vGGk6e', 'QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq', 'did:3:kjzl6cwe1jw14aijwpxwaa1ybg708bp9n5jqt8q89j6yrdqvt8tfxdxw1q5dpxh')
console.log(data)
this.executeJob(data.streamId)*/
}
}

View File

@ -0,0 +1,18 @@
export interface ffmpegProgress {
frames: number
currentFps?: number
currentKbps?: number
targetSize?: number
timemark: string //FFMPEG timemark
percent: number
}
export enum EncodeStatus {
PENDING = 'pending',
QUEUED = 'queued',
LOADING = 'loading',
RUNNING = 'running',
FAILED = 'failed',
UPLOADING = 'uploading',
COMPLETE = 'complete',
}

View File

@ -9,11 +9,15 @@ import DHT from 'libp2p-kad-dht'
import PeerId from 'peer-id'
import { CoreService } from '../core/core.service'
import { decode } from './frame-codec.util'
import { decode, encode } from './frame-codec.util'
import { MESSAGE_TYPES } from './messages.model'
import pipe from 'it-pipe'
import Pushable from 'it-pushable'
const PEERINFO = {id: 'QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq',
privKey: 'CAASpwkwggSjAgEAAoIBAQC90BTfJW9ZDNic30Xkr4acCgEZWRmczeT/KVsecK98/qaTm4nvenHzuqXnh+CuBj1UKqHFjifTz6jy1oCSlJEEJgki0N/Vt/9Dkn/bn8Vjts/5M1ZlYbNfPJx6yEaWDClGz43rXtlHXKiwufPJ4dwPKQQZv4EshOEptAhO2913GB8D7/8bkaAlT+bwG+76jG5XkG9Pp0cHytOZWPBFYYRomOnAfDNRmbAK3lF0oyBXPuOd64AB9P/+wVrGrobKOZO5AiQkfBi0lYqx153tZ8CA5JxpPBLcmRoKxMA9Bmar7DjrVBi8fba1x4d3PufLzPwBIFVLV5mpkGMbtgQL74TNAgMBAAECggEAAcCTAMBat8q7kS8qeQL5ziT1f6Nn7h+kdoqOMci+hfvf08sCyfgqZyKY93s0osah+E3wcl9ulLD9EUjTpQbEE/K58N1Ww6VQMPKARanC67m7T8Sejo8JVd68XxHMPQRduS6fU8XrYZJEaGU/D+UK4ATz6bzv11ZescDctsWm1LubLwQ2RPOfAFPCYI0MYFPamw+py6/3JP0w5uBETrUy+izNraQ560bnqC4fMjkmy+KpuTUk+YFW1JPgPbpw960ZoEhfSBWpRiSpMJ3EByn04xxkumzpAvaffN2JDpypHn2jyAmMqiWHTacJC2Maz/X9KpT5Tj+0UkE9lQBVicG1oQKBgQDykEEwqD4r63O5Wi0JY02WdFOEDaeYIgX9FA7wsIfCrLlJIVOR2HXUL+f0qK8MeddvL/dnIkp+0pndO9RQYb66L9lWBstEJ/gUHuTe7SS05r9bOwMEH8l3V4u0qAqn/O0ZKmPUJhrATsUY2gUitvtE8hOeE1bh2g77vPAMsAff5QKBgQDIU8emmyXGVk/SDlkiyYB1VPLuEkj1KHuuSz9Xmvu9xG3dXZzB15MsN0zycUEUsgylJTqvPJ/It+o2Mvk7A1IZsO9xQUI9JdHKqWS4rwaDvtcKrAxxYf2RJQmvsrkAoSKNe9Oe2FCTu+rdTFe6eZxYdPebMn9rPUjFoNGV/44yyQKBgB5Zfk6gPmcwZqJibhAmpKaWl3yGWNnoJ+eqgtQKwnHROr2ztckh1FxgQh2SnZRqClKXJdV5rOiBYU8VFVOZZ0vUgNUKtJQqjBe4ZdqewWEBHiBEGfSCJasRASHxhKPQObpUW3lH60D0miSp4sqdKoNN5rZ4pP5NUmKdGUv9Gn8hAoGAW/qzwdicqIt6zNzPqnxQog7mF8+HdiEnYKimJchAbCpjs29HCW284mFl0C+WDTWPPshwQIOabeOcA1S2QJVOvgMSfbLUAhV6VQ4f8/hRCm62d+z1LZ4redhCsUxjS1mw7rt7OATkQmDW/tMNuM4brjXOdpDiFlAmOK+Va8TR+pkCgYEAwKBtzUnY7dLJL8yI5feS+DkHvn4MdGMPGqcdug6mkTXOJZdxbvF99dt7czP8fYc5acB4wsSeKUlGjgzNxU7/c6Verao1jl3Yxl+UrMkoruZCf4HIMbYgtoSCbMaoH83/M3xqWtUBAVdYnikAjEptl0HM1nReB63uwT90Y4iSxsU=',
pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC90BTfJW9ZDNic30Xkr4acCgEZWRmczeT/KVsecK98/qaTm4nvenHzuqXnh+CuBj1UKqHFjifTz6jy1oCSlJEEJgki0N/Vt/9Dkn/bn8Vjts/5M1ZlYbNfPJx6yEaWDClGz43rXtlHXKiwufPJ4dwPKQQZv4EshOEptAhO2913GB8D7/8bkaAlT+bwG+76jG5XkG9Pp0cHytOZWPBFYYRomOnAfDNRmbAK3lF0oyBXPuOd64AB9P/+wVrGrobKOZO5AiQkfBi0lYqx153tZ8CA5JxpPBLcmRoKxMA9Bmar7DjrVBi8fba1x4d3PufLzPwBIFVLV5mpkGMbtgQL74TNAgMBAAE='
const PEERINFO = {
id: 'QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq',
privKey: 'CAASpwkwggSjAgEAAoIBAQC90BTfJW9ZDNic30Xkr4acCgEZWRmczeT/KVsecK98/qaTm4nvenHzuqXnh+CuBj1UKqHFjifTz6jy1oCSlJEEJgki0N/Vt/9Dkn/bn8Vjts/5M1ZlYbNfPJx6yEaWDClGz43rXtlHXKiwufPJ4dwPKQQZv4EshOEptAhO2913GB8D7/8bkaAlT+bwG+76jG5XkG9Pp0cHytOZWPBFYYRomOnAfDNRmbAK3lF0oyBXPuOd64AB9P/+wVrGrobKOZO5AiQkfBi0lYqx153tZ8CA5JxpPBLcmRoKxMA9Bmar7DjrVBi8fba1x4d3PufLzPwBIFVLV5mpkGMbtgQL74TNAgMBAAECggEAAcCTAMBat8q7kS8qeQL5ziT1f6Nn7h+kdoqOMci+hfvf08sCyfgqZyKY93s0osah+E3wcl9ulLD9EUjTpQbEE/K58N1Ww6VQMPKARanC67m7T8Sejo8JVd68XxHMPQRduS6fU8XrYZJEaGU/D+UK4ATz6bzv11ZescDctsWm1LubLwQ2RPOfAFPCYI0MYFPamw+py6/3JP0w5uBETrUy+izNraQ560bnqC4fMjkmy+KpuTUk+YFW1JPgPbpw960ZoEhfSBWpRiSpMJ3EByn04xxkumzpAvaffN2JDpypHn2jyAmMqiWHTacJC2Maz/X9KpT5Tj+0UkE9lQBVicG1oQKBgQDykEEwqD4r63O5Wi0JY02WdFOEDaeYIgX9FA7wsIfCrLlJIVOR2HXUL+f0qK8MeddvL/dnIkp+0pndO9RQYb66L9lWBstEJ/gUHuTe7SS05r9bOwMEH8l3V4u0qAqn/O0ZKmPUJhrATsUY2gUitvtE8hOeE1bh2g77vPAMsAff5QKBgQDIU8emmyXGVk/SDlkiyYB1VPLuEkj1KHuuSz9Xmvu9xG3dXZzB15MsN0zycUEUsgylJTqvPJ/It+o2Mvk7A1IZsO9xQUI9JdHKqWS4rwaDvtcKrAxxYf2RJQmvsrkAoSKNe9Oe2FCTu+rdTFe6eZxYdPebMn9rPUjFoNGV/44yyQKBgB5Zfk6gPmcwZqJibhAmpKaWl3yGWNnoJ+eqgtQKwnHROr2ztckh1FxgQh2SnZRqClKXJdV5rOiBYU8VFVOZZ0vUgNUKtJQqjBe4ZdqewWEBHiBEGfSCJasRASHxhKPQObpUW3lH60D0miSp4sqdKoNN5rZ4pP5NUmKdGUv9Gn8hAoGAW/qzwdicqIt6zNzPqnxQog7mF8+HdiEnYKimJchAbCpjs29HCW284mFl0C+WDTWPPshwQIOabeOcA1S2QJVOvgMSfbLUAhV6VQ4f8/hRCm62d+z1LZ4redhCsUxjS1mw7rt7OATkQmDW/tMNuM4brjXOdpDiFlAmOK+Va8TR+pkCgYEAwKBtzUnY7dLJL8yI5feS+DkHvn4MdGMPGqcdug6mkTXOJZdxbvF99dt7czP8fYc5acB4wsSeKUlGjgzNxU7/c6Verao1jl3Yxl+UrMkoruZCf4HIMbYgtoSCbMaoH83/M3xqWtUBAVdYnikAjEptl0HM1nReB63uwT90Y4iSxsU=',
pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC90BTfJW9ZDNic30Xkr4acCgEZWRmczeT/KVsecK98/qaTm4nvenHzuqXnh+CuBj1UKqHFjifTz6jy1oCSlJEEJgki0N/Vt/9Dkn/bn8Vjts/5M1ZlYbNfPJx6yEaWDClGz43rXtlHXKiwufPJ4dwPKQQZv4EshOEptAhO2913GB8D7/8bkaAlT+bwG+76jG5XkG9Pp0cHytOZWPBFYYRomOnAfDNRmbAK3lF0oyBXPuOd64AB9P/+wVrGrobKOZO5AiQkfBi0lYqx153tZ8CA5JxpPBLcmRoKxMA9Bmar7DjrVBi8fba1x4d3PufLzPwBIFVLV5mpkGMbtgQL74TNAgMBAAE='
}
export class Lib2pService {
@ -21,8 +25,58 @@ export class Lib2pService {
libp2p: Libp2p
constructor(self) {
this.self = self;
this.self = self;
this.connectionHandler = this.connectionHandler.bind(this)
}
async subscribeClient() {
}
async connectionHandler({ connection, stream, protocol }) {
console.log(connection)
const pushable = Pushable()
pipe(pushable, stream.sink)
let listener;
for await (const item of stream.source) {
const decodedMessage = decode(item._bufs[0])
console.log(decodedMessage)
if(decodedMessage.type === MESSAGE_TYPES.SUBSCRIBE_UPDATE) {
listener = this.self.encoder.events.on('job.progress', (streamId, statusUpdate) => {
console.log('rxl is receiving update')
console.log(streamId, decodedMessage.streamId)
if(streamId === decodedMessage.streamId) {
console.log(statusUpdate)
pushable.push(encode({
type: MESSAGE_TYPES.STATUS_UPDATE,
statusUpdate
}))
}
})
}
if(decodedMessage.type === MESSAGE_TYPES.UNSUBSCRIBE_UPDATE) {
}
if(decodedMessage.type === MESSAGE_TYPES.REQUEST_ENCODE_JOB) {
console.log(stream)
console.log(connection)
const data = await this.self.encoder.createJob(decodedMessage.ipfsHash, connection.remotePeer.toString())
pushable.push(encode({
type: MESSAGE_TYPES.RESPONE_ENCODE_JOB,
streamId: data.streamId
}))
this.self.encoder.executeJob(data.streamId)
}
}
//this.self.encoder.events.off
//clear event listeners
console.log('stream is ending')
}
async start() {
const idListener = await PeerId.createFromJSON(PEERINFO)
console.log('P2P interface starting up')
@ -36,7 +90,7 @@ export class Lib2pService {
dht: DHT
},
addresses: {
listen: ['/ip4/10.0.1.188/tcp/1445/']
listen: ['/ip4/10.0.1.188/tcp/14445/']
},
config: {
peerDiscovery: {
@ -85,7 +139,7 @@ export class Lib2pService {
}
}
this.libp2p.handle('/spk-video-encoder/1.0.0', handler)
this.libp2p.handle('/spk-video-encoder/1.0.0', this.connectionHandler)
this.libp2p.dialProtocol(this.libp2p.peerId, '/spk-video-encoder/1.0.0')
}
}

View File

@ -1,10 +1,18 @@
enum VIDEO_FORMATS {
mp4 = 'mp4'
}
/**
* Sends an encoding job to server
*/
interface ASK_ENCODE_JOB {
export interface REQ_ENCODE_JOB {
format?: VIDEO_FORMATS
ipfsHash: string
}
export interface RET_ENCODE_JOB {
streamId: string
}
interface STATUS_UPDATE {
@ -20,4 +28,46 @@ interface SUBSCRIBE_UPDATE {
interface UNSUBSCRIBE_UPDATE {
id: string
}
/**
* Designates whether the encoder can accept jobs.
* This is most applicable when the encoder node is overloaded or performing a graceful shutdown (finishing all jobs before going offline)
*/
export enum JOB_RULE {
OPEN,
CLOSED
}
/**
* The policy of whether the encoder should accept a job.
*/
export enum JOB_POLICY {
OPEN, //Open, anyone can upload to the encoder
PAYWALL, //Payment is required. This won't be implemented initially.
PRIVATE //Only a select list of users can upload. Authentication required
}
export interface NODE_INFO {
job_policy: JOB_POLICY
job_rule: JOB_RULE
peerInfo: any //IPFS peerInfo
scripts: {
motd?: string, //General note for all users connecting to the encoder. Shown before encoding
upload_motd?: string //Notified to the user when they upload a video
short_policy?: string //A general policy for uploading/what to look out for
tos?: string //Terms of service. The legal stuff that an encoder may need to provide to the user
}
}
export enum MESSAGE_TYPES {
STATUS_UPDATE,
SUBSCRIBE_UPDATE,
UNSUBSCRIBE_UPDATE,
REQUEST_ENCODE_JOB,
RESPONE_ENCODE_JOB
}
export function supportedType(messageType) {
}