* replace stream module by improved version of readable/writable impl * rm server.js * async api improvments: - better tracing of calls from worker back and forth - fix transferrable handling to avoid copying buffers for r/w - optional debug logs - completed jsdocs annotations - add dispose method - add setLogLevel method (analoguous to added binding) * node-srt C bindings: - add SetLogLevel to get libSRT log output if desired - add OK static member - add #define EPOLL_EVENTS_NUM_MAX 1024 - improve error string thrown in Read (add that it comes from srt_recvmsg) - improve error string thrown in Write (add that it comes from srt_sendmsg2) - misc isofunctional improvements (var names) and comments * add SRT logging related JS-side helper * rewrite flat TypeScript decl files without "module" keyword * add ts enum decl for all libSRT enums * async-worker: enable using transferrable for zero-copy + allow better debugging (like in api/dispatcher side) + misc improvements on code quality * add async-helpers: various functions to help dealing with transferrables + tracing calls to native bindings in debug output * add async read/write modes functions + async-reader-writer class - these will allow for performing high-level r/w operations conveniently at optimum throughput for larger pieces of payload i.e list of packets. * add srt-server and srt-connection (can manage multiple clients), - based on async-api - can be used with reader/writer (i.e the underlying modes) * srt-server/connection typings * async srt spec: add dispose method usage (but commented out as crashing atm) * async srt spec: rm redundant checks on SRT static members (they are done in other spec already) * promises api spec: formal fixes * stream spec: add dummy test * package.json: - put gyp toolchain in runtime deps (since the build happens on install) - add JEST test runner - shorten check-tsc script - rebuild script: check & use all CPU cores available - run rebuild actually on install, not preinstall (fixes deps not being there) - remove preinstall and thus "npm install git-clone" in the package scripts * update package lock * update typings index not to need triple-slashs anymore * in srt.ts example: check for read return value type * build-srt-sdk script: - allow to use any local libSRT code repo - when using make: use all amount of cores available for build - isolate better code running on different platforms * update package main index with new things * add enum typings index * add jest config * add "use strict" on async-srt-await example * add integration/smoke testing for client-to-server one-way burst write * readme: add note on build prerequisites * readme: add infos on new components SRTServer/Connection & AsyncReaderWriter
322 lines
8.2 KiB
JavaScript
322 lines
8.2 KiB
JavaScript
const { AsyncSRT } = require('./async');
|
|
const { AsyncReaderWriter } = require('./async-reader-writer');
|
|
const { SRT } = require('../build/Release/node_srt.node');
|
|
|
|
const EventEmitter = require("events");
|
|
const debug = require('debug')('srt-server');
|
|
|
|
const DEBUG = false;
|
|
|
|
const EPOLL_PERIOD_MS_DEFAULT = 0;
|
|
|
|
const EPOLLUWAIT_TIMEOUT_MS = 0;
|
|
|
|
const SOCKET_LISTEN_BACKLOG = 128;
|
|
|
|
/**
|
|
* @emits data
|
|
* @emits closing
|
|
* @emits closed
|
|
*/
|
|
class SRTConnection extends EventEmitter {
|
|
/**
|
|
*
|
|
* @param {AsyncSRT} asyncSrt
|
|
* @param {number} fd
|
|
*/
|
|
constructor(asyncSrt, fd) {
|
|
super();
|
|
|
|
this._asyncSrt = asyncSrt;
|
|
this._fd = fd;
|
|
this._gotFirstData = false;
|
|
}
|
|
|
|
/**
|
|
* @returns {number}
|
|
*/
|
|
get fd() {
|
|
return this._fd;
|
|
}
|
|
|
|
/**
|
|
* Will be false until *after* emit of first `data` event.
|
|
* After that will be true.
|
|
*/
|
|
get gotFirstData() {
|
|
return this._gotFirstData;
|
|
}
|
|
|
|
/**
|
|
* @returns {AsyncReaderWriter}
|
|
*/
|
|
getReaderWriter() {
|
|
return new AsyncReaderWriter(this._asyncSrt, this.fd);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {number} bytes
|
|
* @returns {Promise<Buffer | SRTResult.SRT_ERROR | null>}
|
|
*/
|
|
async read(bytes) {
|
|
return await this._asyncSrt.read(this.fd, bytes);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* Pass a packet buffer to write to the connection.
|
|
*
|
|
* The size of the buffer must not exceed the SRT payload MTU
|
|
* (usually 1316 bytes).
|
|
*
|
|
* Otherwise the call will resolve to SRT_ERROR.
|
|
*
|
|
* A system-specific socket-message error message may show in logs as enabled
|
|
* where the error is thrown (on the binding call to the native SRT API),
|
|
* and in the async API internals as it gets propagated back from the task-runner).
|
|
*
|
|
* Note that any underlying data buffer passed in
|
|
* will be *neutered* by our worker thread and
|
|
* therefore become unusable (i.e go to detached state, `byteLengh === 0`)
|
|
* for the calling thread of this method.
|
|
* When consuming from a larger piece of data,
|
|
* chunks written will need to be slice copies of the source buffer.
|
|
*
|
|
* @param {Buffer | Uint8Array} chunk
|
|
*/
|
|
async write(chunk) {
|
|
return await this._asyncSrt.write(this.fd, chunk);
|
|
}
|
|
|
|
/**
|
|
* @returns {Promise<SRTResult | null>}
|
|
*/
|
|
async close() {
|
|
if (this.isClosed()) return null;
|
|
const asyncSrt = this._asyncSrt;
|
|
this._asyncSrt = null;
|
|
this.emit('closing');
|
|
const result = await asyncSrt.close(this.fd);
|
|
this.emit('closed', result);
|
|
this.off();
|
|
return result;
|
|
}
|
|
|
|
isClosed() {
|
|
return ! this._asyncSrt;
|
|
}
|
|
|
|
onData() {
|
|
this.emit('data');
|
|
if (!this.gotFirstData) {
|
|
this._gotFirstData = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @emits created
|
|
* @emits opened
|
|
* @emits connection
|
|
* @emits disconnection
|
|
* @emits disposed
|
|
*/
|
|
class SRTServer extends EventEmitter {
|
|
|
|
/**
|
|
*
|
|
* @param {number} port socket port number
|
|
* @param {string} address optional, default: '0.0.0.0'
|
|
* @param {number} epollPeriodMs optional, default: EPOLL_PERIOD_MS_DEFAULT
|
|
* @returns {Promise<SRTServer>}
|
|
*/
|
|
static create(port, address, epollPeriodMs) {
|
|
return new SRTServer(port, address, epollPeriodMs).create();
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {number} port socket port number
|
|
* @param {string} address optional, default: '0.0.0.0'
|
|
* @param {number} epollPeriodMs optional, default: EPOLL_PERIOD_MS_DEFAULT
|
|
*/
|
|
constructor(port, address = '0.0.0.0', epollPeriodMs = EPOLL_PERIOD_MS_DEFAULT) {
|
|
super();
|
|
|
|
if (!Number.isInteger(port) || port <= 0 || port > 65535)
|
|
throw new Error('Need a valid port number but got: ' + port);
|
|
|
|
this.port = port;
|
|
this.address = address;
|
|
this.epollPeriodMs = epollPeriodMs;
|
|
this.socket = null;
|
|
this.epid = null;
|
|
|
|
this._pollEventsTimer = null;
|
|
this._asyncSrt = new AsyncSRT();
|
|
this._connectionMap = {};
|
|
}
|
|
|
|
async dispose() {
|
|
clearTimeout(this._pollEventsTimer);
|
|
await this._asyncSrt.close(this.socket);
|
|
this.socket = null;
|
|
const res = await this._asyncSrt.dispose();
|
|
this._asyncSrt = null;
|
|
this.emit('disposed');
|
|
return res;
|
|
}
|
|
|
|
/**
|
|
* Call this before `open`.
|
|
* Call `setSocketFlags` after this.
|
|
*
|
|
* @return {Promise<SRTServer>}
|
|
*/
|
|
async create() {
|
|
this.socket = await this._asyncSrt.createSocket();
|
|
this.emit('created');
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Call this after `create`.
|
|
* Call `setSocketFlags` before calling this.
|
|
*
|
|
* @return {Promise<SRTServer>}
|
|
*/
|
|
async open() {
|
|
let result;
|
|
result = await this._asyncSrt.bind(this.socket, this.address, this.port);
|
|
if (result === SRT.ERROR) {
|
|
throw new Error('SRT.bind() failed');
|
|
}
|
|
result = await this._asyncSrt.listen(this.socket, SOCKET_LISTEN_BACKLOG);
|
|
if (result === SRT.ERROR) {
|
|
throw new Error('SRT.listen() failed');
|
|
}
|
|
result = await this._asyncSrt.epollCreate();
|
|
if (result === SRT.ERROR) {
|
|
throw new Error('SRT.epollCreate() failed');
|
|
}
|
|
this.epid = result;
|
|
|
|
this.emit('opened');
|
|
|
|
// we should await the epoll subscribe result before continuing
|
|
// since it is useless to poll events otherwise
|
|
// and we should also yield from the stack at this point
|
|
// since the `opened` event handlers above may do whatever
|
|
await this._asyncSrt.epollAddUsock(this.epid, this.socket, SRT.EPOLL_IN | SRT.EPOLL_ERR);
|
|
|
|
this._pollEvents();
|
|
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {SRTSockOpt[]} opts
|
|
* @param {SRTSockOptValue[]} values
|
|
* @returns {Promise<SRTResult[]>}
|
|
*/
|
|
async setSocketFlags(opts, values) {
|
|
if (opts.length !== values.length)
|
|
throw new Error('opts and values must have same length');
|
|
const promises = [];
|
|
opts.forEach((opt, index) => {
|
|
const p = this._asyncSrt.setSockOpt(this.socket, opt, values[index]);
|
|
promises.push(p);
|
|
})
|
|
return Promise.all(promises);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {number} fd
|
|
* @returns {SRTConnection | null}
|
|
*/
|
|
getConnectionByHandle(fd) {
|
|
return this._connectionMap[fd] || null;
|
|
}
|
|
|
|
/**
|
|
* @returns {Array<SRTConnection>}
|
|
*/
|
|
getAllConnections() {
|
|
return Array.from(Object.values(this._connectionMap));
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {SRTEpollEvent} event
|
|
*/
|
|
async _handleEvent(event) {
|
|
const status = await this._asyncSrt.getSockState(event.socket);
|
|
|
|
// our local listener socket
|
|
if (event.socket === this.socket) {
|
|
|
|
if (status === SRT.SRTS_LISTENING) {
|
|
const fd = await this._asyncSrt.accept(this.socket);
|
|
// no need to await the epoll subscribe result before continuing
|
|
this._asyncSrt.epollAddUsock(this.epid, fd, SRT.EPOLL_IN | SRT.EPOLL_ERR);
|
|
debug("Accepted client connection with file-descriptor:", fd);
|
|
// create new client connection handle
|
|
// and emit accept event
|
|
const connection = new SRTConnection(this._asyncSrt, fd);
|
|
connection.on('closing', () => {
|
|
// remove handle
|
|
delete this._connectionMap[fd];
|
|
});
|
|
this._connectionMap[fd] = connection;
|
|
this.emit('connection', connection);
|
|
}
|
|
|
|
// a client socket / fd
|
|
// check if broken or closed
|
|
} else if (status === SRT.SRTS_BROKEN
|
|
|| status === SRT.SRTS_NONEXIST
|
|
|| status === SRT.SRTS_CLOSED) {
|
|
const fd = event.socket;
|
|
debug("Client disconnected on fd:", fd);
|
|
if (this._connectionMap[fd]) {
|
|
await this._connectionMap[fd].close();
|
|
this.emit('disconnection', fd);
|
|
}
|
|
// not broken, just new data
|
|
} else {
|
|
const fd = event.socket;
|
|
DEBUG && debug("Got data from connection on fd:", fd);
|
|
const connection = this.getConnectionByHandle(fd);
|
|
if (!connection) {
|
|
console.warn("Got event for fd not in connections map:", fd);
|
|
return;
|
|
}
|
|
connection.onData();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
*/
|
|
async _pollEvents() {
|
|
const events = await this._asyncSrt.epollUWait(this.epid, EPOLLUWAIT_TIMEOUT_MS);
|
|
events.forEach((event) => {
|
|
this._handleEvent(event);
|
|
});
|
|
|
|
// clearing in case we get called multiple times
|
|
// when already timer scheduled
|
|
// will be no-op if timer-id invalid or old
|
|
clearTimeout(this._pollEventsTimer);
|
|
this._pollEventsTimer
|
|
= setTimeout(this._pollEvents.bind(this), this.epollPeriodMs)
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
SRTConnection,
|
|
SRTServer
|
|
};
|