diff --git a/.gitignore b/.gitignore index 838b758..b2b2ecb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ node_modules build deps +output +.vscode diff --git a/examples/readable.js b/examples/readable.js new file mode 100644 index 0000000..8bab16e --- /dev/null +++ b/examples/readable.js @@ -0,0 +1,8 @@ +const fs = require('fs'); +const dest = fs.createWriteStream('./output'); +const { SRTReadStream } = require('../index.js'); + +const srt = new SRTReadStream('0.0.0.0', 1234); +srt.listen(readStream => { + readStream.pipe(dest); +}); \ No newline at end of file diff --git a/index.js b/index.js index 48ae9f9..783aa2b 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,9 @@ const LIB = require('./build/Release/node_srt.node'); const Server = require('./src/server.js'); +const { SRTReadStream } = require('./src/stream.js'); module.exports = { SRT: LIB.SRT, - Server: Server + Server: Server, + SRTReadStream } \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 3680d5f..e7d582a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -156,6 +156,14 @@ "assert-plus": "^1.0.0" } }, + "debug": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", + "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", + "requires": { + "ms": "^2.1.1" + } + }, "delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -457,6 +465,11 @@ "integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==", "dev": true }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, "node-addon-api": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.0.0.tgz", diff --git a/package.json b/package.json index 230ba3a..48d215f 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "scripts": { "preinstall": "npm install git-clone && node scripts/build-srt-sdk.js", "install": "node-gyp rebuild -j 8", + "rebuild": "node-gyp rebuild", "clean": "node-gyp clean" }, "repository": { @@ -22,6 +23,7 @@ "node-gyp": "^7.0.0" }, "dependencies": { + "debug": "^4.1.1", "git-clone": "^0.1.0", "node-addon-api": "^3.0.0" } diff --git a/src/srt-enums.h b/src/srt-enums.h new file mode 100644 index 0000000..e69de29 diff --git a/src/stream.js b/src/stream.js new file mode 100644 index 0000000..ef624fd --- /dev/null +++ b/src/stream.js @@ -0,0 +1,60 @@ +const { Readable } = require('stream'); +const LIB = require('../build/Release/node_srt.node'); +const debug = require('debug')('srt-stream'); + +/** + * Example: + * + * const dest = fs.createWritableStream('./output'); + * + * const srt = new SRTReadStream('0.0.0.0', 1234); + * srt.listen(readStream => { + * readStream.pipe(dest); + * }) + * + */ +class SRTReadStream extends Readable { + constructor(address, port, opts) { + super(); + this.srt = new LIB.SRT(); + this.socket = this.srt.createSocket(); + this.address = address; + this.port = port; + } + + listen(cb) { + this.srt.bind(this.socket, this.address, this.port); + this.srt.listen(this.socket, 2); + debug("Waiting for client"); + this.fd = this.srt.accept(this.socket); + if (this.fd) { + cb(this); + } + } + + connect(cb) { + this.fd = this.srt.connect(this.socket, address, port); + if (this.fd) { + cb(this); + } + } + + _read(size) { + let chunk; + try { + chunk = this.srt.read(this.fd, size); + debug(`Read chunk ${chunk.length}`); + while (!this.push(chunk)) { + debug(`Read chunk ${chunk.length}`); + chunk = this.srt.read(this.fd, size); + } + } catch (exc) { + debug(exc.message); + this.push(null); + } + } +} + +module.exports = { + SRTReadStream +} \ No newline at end of file