网站优化北京联系电话?,做网站先买域名,网站个人备案麻烦吗,成都官网seo费用本章内容解读SRS开源代码框架#xff0c;无二次开发#xff0c;以学习交流为目的。
SRS是国人开发的流媒体服务器#xff0c;C语言开发#xff0c;本章使用版本#xff1a;https://github.com/ossrs/srs/tree/5.0release。 目录 SRS协程库ST的使用源码ST协程库测试SrsAut…本章内容解读SRS开源代码框架无二次开发以学习交流为目的。
SRS是国人开发的流媒体服务器C语言开发本章使用版本https://github.com/ossrs/srs/tree/5.0release。 目录 SRS协程库ST的使用源码ST协程库测试SrsAutoFree测试 SRS协程库ST的使用
C语言协程库state-threads简称ST库https://sourceforge.net/projects/state-threads/。 SRS对state-threads库进行了2次开发https://github.com/ossrs/state-threads。
1、ST库的编译 在下载的srs-5.0release.zip安装包里有ST源码直接编译
cd /srs-5.0release/trunk/3rdparty/st-srs
make linux-debug #编译在构建目录生成库文件libst.a头文件st.h。
2、ST库的使用 SRS封装了协程类SrsSTCoroutine通过C类的继承和虚函数回调实现了在回调函数执行协程处理函数和linux线程库函数pthread_create用法类似。
这部分代码还包含了SrsAutoFree定义可以在离开作用域时自动释放指针也是很有用的一个模块。
源码
源码结构如下 ├── chw_adapt.h ├── srs_app_st.cpp ├── srs_app_st.hpp ├── srs_kernel_error.cpp ├── srs_kernel_error.hpp ├── srs_kernel_io.cpp ├── srs_kernel_io.hpp ├── srs_protocol_io.cpp ├── srs_protocol_io.hpp ├── srs_protocol_st.cpp └── srs_protocol_st.hpp
其中srs_kernel_io、srs_protocol_io和SRS源码一样可以在SRS源码里找srs_kernel_error源码参考这里SRS开源代码框架错误类(SrsCplxError)的使用。
日志打印使用printf代替上下文SrsContextId使用std::string代替。 chw_adapt.h
#ifndef CHW_ADAPT_H
#define CHW_ADAPT_H#include string
typedef std::string SrsContextId; //减少依赖上下文ID使用SrsContextId代替
typedef int64_t srs_utime_t;
#define SRS_UTIME_MILLISECONDS 1000
#define srsu2ms(us) ((us) / SRS_UTIME_MILLISECONDS)
#define srsu2msi(us) int((us) / SRS_UTIME_MILLISECONDS)// Never timeout.
#define SRS_UTIME_NO_TIMEOUT ((srs_utime_t) -1LL)// To delete object.
#define SrsAutoFree(className, instance) \impl_SrsAutoFreeclassName _auto_free_##instance(instance, false, false, NULL)
// To delete array.
#define SrsAutoFreeA(className, instance) \impl_SrsAutoFreeclassName _auto_free_array_##instance(instance, true, false, NULL)
// Use free instead of delete.
#define SrsAutoFreeF(className, instance) \impl_SrsAutoFreeclassName _auto_free_##instance(instance, false, true, NULL)
// Use hook instead of delete.
#define SrsAutoFreeH(className, instance, hook) \impl_SrsAutoFreeclassName _auto_free_##instance(instance, false, false, hook)
// The template implementation.
templateclass T
class impl_SrsAutoFree
{
private:T** ptr;bool is_array;bool _use_free;void (*_hook)(T*);
public:// If use_free, use free(void*) to release the p.// If specified hook, use hook(p) to release it.// Use delete to release p, or delete[] if p is an array.impl_SrsAutoFree(T** p, bool array, bool use_free, void (*hook)(T*)) {ptr p;is_array array;_use_free use_free;_hook hook;}virtual ~impl_SrsAutoFree() {if (ptr NULL || *ptr NULL) {return;}if (_use_free) {free(*ptr);} else if (_hook) {_hook(*ptr);} else {if (is_array) {delete[] *ptr;} else {delete *ptr;}}*ptr NULL;}
};#endif // CHW_ADAPT_Hsrs_app_st.hpp
#ifndef SRS_APP_ST_HPP
#define SRS_APP_ST_HPP#include string
#include chw_adapt.h#include srs_kernel_error.hpp
#include srs_protocol_st.hppclass SrsFastCoroutine;
// 每个协程都要继承这个类
class ISrsCoroutineHandler
{
public:ISrsCoroutineHandler();virtual ~ISrsCoroutineHandler();
public:// Do the work. The ST-coroutine will terminated normally if it returned.// remark If the cycle has its own loop, it must check the thread pull.// 协程处理函数如果返回则协程结束virtual srs_error_t cycle() 0;
};// Start the object, generally a croutine.
// 通常是启动一个ST对象
class ISrsStartable
{
public:ISrsStartable();virtual ~ISrsStartable();
public:virtual srs_error_t start() 0;
};// The corotine object.
// 协程基类
class SrsCoroutine : public ISrsStartable
{
public:SrsCoroutine();virtual ~SrsCoroutine();
public:virtual void stop() 0;virtual void interrupt() 0;// return a copy of error, which should be freed by user.// NULL if not terminated and user should pull again.virtual srs_error_t pull() 0;// Get and set the context id of coroutine.virtual const SrsContextId cid() 0;virtual void set_cid(const SrsContextId cid) 0;
};// An empty coroutine, user can default to this object before create any real coroutine.
// see https://github.com/ossrs/srs/pull/908
// 一个空的协程用户可以在创建任何真正的协程序之前默认为这个对象。
class SrsDummyCoroutine : public SrsCoroutine
{
private:SrsContextId cid_;
public:SrsDummyCoroutine();virtual ~SrsDummyCoroutine();
public:virtual srs_error_t start();virtual void stop();virtual void interrupt();virtual srs_error_t pull();virtual const SrsContextId cid();virtual void set_cid(const SrsContextId cid);
};// A ST-coroutine is a lightweight thread, just like the goroutine.
// But the goroutine maybe run on different thread, while ST-coroutine only
// run in single thread, because it use setjmp and longjmp, so it may cause
// problem in multiple threads. For SRS, we only use single thread module,
// like NGINX to get very high performance, with asynchronous and non-blocking
// sockets.
// ST-coroutine是一个轻量级的线程就像goroutine一样。
// 但是goroutine可能在不同的线程上运行而ST-coroutine只在单个线程中运行因为它使用了setjmp和longjmp所以它可能会在多个线程中导致问题。
// 对于SRS我们只使用单线程模块类似NGINX来获得非常高的性能具有异步和非阻塞套接字。
// reamrk For multiple processes, please use go-oryx to fork many SRS processes.
// 对于多个进程请使用go-oryx来fork多个SRS进程。
// Please read https://github.com/ossrs/go-oryx
// remark For debugging of ST-coroutine, read _st_iterate_threads_flag of ST/README
// https://github.com/ossrs/state-threads/blob/st-1.9/README#L115
// remark We always create joinable thread, so we must join it or memory leak,
// Please read https://github.com/ossrs/srs/issues/78
class SrsSTCoroutine : public SrsCoroutine
{
private:SrsFastCoroutine* impl_;
public:// Create a thread with name n and handler h.// remark User can specify a cid for thread to use, or we will allocate a new one.SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h);SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);virtual ~SrsSTCoroutine();
public:// Set the stack size of coroutine, default to 0(64KB).void set_stack_size(int v);
public:// Start the thread.// remark Should never start it when stopped or terminated.virtual srs_error_t start();// Interrupt the thread then wait to terminated.// remark If user want to notify thread to quit async, for example if there are// many threads to stop like the encoder, use the interrupt to notify all threads// to terminate then use stop to wait for each to terminate.virtual void stop();// Interrupt the thread and notify it to terminate, it will be wakeup if its blocked// in some IO operations, such as st_read or st_write, then it will found should quit,// finally the thread should terminated normally, user can use the stop to join it.virtual void interrupt();// Check whether thread is terminated normally or error(stopped or termianted with error),// and the thread should be running if it return ERROR_SUCCESS.// remark Return specified error when thread terminated normally with error.// remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error.// remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.virtual srs_error_t pull();// Get and set the context id of thread.virtual const SrsContextId cid();virtual void set_cid(const SrsContextId cid);
};// High performance coroutine.
// 高性能协程
class SrsFastCoroutine
{
private:std::string name;int stack_size;ISrsCoroutineHandler* handler;
private:srs_thread_t trd;SrsContextId cid_;srs_error_t trd_err;
private:bool started;bool interrupted;bool disposed;// Cycle done, no need to interrupt it.bool cycle_done;
private:// Sub state in disposed, we need to wait for thread to quit.// 子状态被处理后我们需要等待线程退出。bool stopping_;SrsContextId stopping_cid_;
public:SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h);SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);virtual ~SrsFastCoroutine();
public:void set_stack_size(int v);
public:srs_error_t start();void stop();void interrupt();inline srs_error_t pull() {if (trd_err srs_success) {return srs_success;}return srs_error_copy(trd_err);}const SrsContextId cid();virtual void set_cid(const SrsContextId cid);
private:srs_error_t cycle();static void* pfn(void* arg);
};// Like goroutine sync.WaitGroup.
// 类似go语言的sync.WaitGroup
class SrsWaitGroup
{
private:int nn_;srs_cond_t done_;
public:SrsWaitGroup();virtual ~SrsWaitGroup();
public:// When start for n coroutines.void add(int n);// When coroutine is done.void done();// Wait for all corotine to be done.void wait();
};#endif
srs_app_st.cpp
#include srs_app_st.hpp#include string
using namespace std;#include srs_kernel_error.hppISrsCoroutineHandler::ISrsCoroutineHandler()
{
}ISrsCoroutineHandler::~ISrsCoroutineHandler()
{
}ISrsStartable::ISrsStartable()
{
}ISrsStartable::~ISrsStartable()
{
}SrsCoroutine::SrsCoroutine()
{
}SrsCoroutine::~SrsCoroutine()
{
}SrsDummyCoroutine::SrsDummyCoroutine()
{
}SrsDummyCoroutine::~SrsDummyCoroutine()
{
}srs_error_t SrsDummyCoroutine::start()
{return srs_error_new(ERROR_THREAD, dummy coroutine);
}void SrsDummyCoroutine::stop()
{
}void SrsDummyCoroutine::interrupt()
{
}srs_error_t SrsDummyCoroutine::pull()
{return srs_error_new(ERROR_THREAD, dummy pull);
}const SrsContextId SrsDummyCoroutine::cid()
{return cid_;
}void SrsDummyCoroutine::set_cid(const SrsContextId cid)
{cid_ cid;
}SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
{impl_ new SrsFastCoroutine(n, h);
}SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{impl_ new SrsFastCoroutine(n, h, cid);
}SrsSTCoroutine::~SrsSTCoroutine()
{srs_freep(impl_);
}void SrsSTCoroutine::set_stack_size(int v)
{impl_-set_stack_size(v);
}srs_error_t SrsSTCoroutine::start()
{return impl_-start();
}void SrsSTCoroutine::stop()
{impl_-stop();
}void SrsSTCoroutine::interrupt()
{impl_-interrupt();
}srs_error_t SrsSTCoroutine::pull()
{return impl_-pull();
}const SrsContextId SrsSTCoroutine::cid()
{return impl_-cid();
}void SrsSTCoroutine::set_cid(const SrsContextId cid)
{impl_-set_cid(cid);
}SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h)
{// TODO: FIXME: Reduce duplicated code.name n;handler h;trd NULL;trd_err srs_success;started interrupted disposed cycle_done false;stopping_ false;// 0 use default, default is 64K.stack_size 0;
}SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{name n;handler h;cid_ cid;trd NULL;trd_err srs_success;started interrupted disposed cycle_done false;stopping_ false;// 0 use default, default is 64K.stack_size 0;
}SrsFastCoroutine::~SrsFastCoroutine()
{stop();// TODO: FIXME: We must assert the cycle is done.srs_freep(trd_err);
}void SrsFastCoroutine::set_stack_size(int v)
{stack_size v;
}srs_error_t SrsFastCoroutine::start()
{srs_error_t err srs_success;if (started || disposed) {if (disposed) {err srs_error_new(ERROR_THREAD, disposed);} else {err srs_error_new(ERROR_THREAD, started);}if (trd_err srs_success) {trd_err srs_error_copy(err);}return err;}if ((trd (srs_thread_t)_pfn_st_thread_create(pfn, this, 1, stack_size)) NULL) {err srs_error_new(ERROR_THREAD, create failed);srs_freep(trd_err);trd_err srs_error_copy(err);return err;}started true;return err;
}void SrsFastCoroutine::stop()
{if (disposed) {if (stopping_) {/*srs_error*/printf(thread is stopping by %s\n, stopping_cid_.c_str());srs_assert(!stopping_);}return;}disposed true;stopping_ true;interrupt();// When not started, the trd is NULL.if (trd) {void* res NULL;int r0 srs_thread_join(trd, res);if (r0) {// By st_thread_joinif (errno EINVAL) srs_assert(!r0);if (errno EDEADLK) srs_assert(!r0);// By st_cond_timedwaitif (errno EINTR) srs_assert(!r0);if (errno ETIME) srs_assert(!r0);// Otherssrs_assert(!r0);}srs_error_t err_res (srs_error_t)res;if (err_res ! srs_success) {// When worker cycle done, the error has already been overrided,// so the trd_err should be equal to err_res.srs_assert(trd_err err_res);}}// If theres no error occur from worker, try to set to terminated error.if (trd_err srs_success !cycle_done) {trd_err srs_error_new(ERROR_THREAD, terminated);}// Now, weare stopped.stopping_ false;return;
}void SrsFastCoroutine::interrupt()
{if (!started || interrupted || cycle_done) {return;}interrupted true;if (trd_err srs_success) {trd_err srs_error_new(ERROR_THREAD, interrupted);}// Note that if another thread is stopping thread and waiting in st_thread_join,// the interrupt will make the st_thread_join fail.srs_thread_interrupt(trd);
}const SrsContextId SrsFastCoroutine::cid()
{return cid_;
}void SrsFastCoroutine::set_cid(const SrsContextId cid)
{cid_ cid;
// srs_context_set_cid_of(trd, cid);
}srs_error_t SrsFastCoroutine::cycle()
{
// if (_srs_context) {
// if (cid_.empty()) {
// cid_ _srs_context-generate_id();
// }
// _srs_context-set_id(cid_);
// }srs_error_t err handler-cycle();if (err ! srs_success) {return srs_error_wrap(err, coroutine cycle);}// Set cycle done, no need to interrupt it.cycle_done true;return err;
}void* SrsFastCoroutine::pfn(void* arg)
{SrsFastCoroutine* p (SrsFastCoroutine*)arg;srs_error_t err p-cycle();// Set the err for function pull to fetch it.// see https://github.com/ossrs/srs/pull/1304#issuecomment-480484151if (err ! srs_success) {srs_freep(p-trd_err);// Its ok to directly use it, because its returned by st_thread_join.p-trd_err err;}return (void*)err;
}SrsWaitGroup::SrsWaitGroup()
{nn_ 0;done_ srs_cond_new();
}SrsWaitGroup::~SrsWaitGroup()
{wait();srs_cond_destroy(done_);
}void SrsWaitGroup::add(int n)
{nn_ n;
}void SrsWaitGroup::done()
{nn_--;if (nn_ 0) {srs_cond_signal(done_);}
}void SrsWaitGroup::wait()
{if (nn_ 0) {srs_cond_wait(done_);}
}srs_protocol_st.hpp
#ifndef SRS_PROTOCOL_ST_HPP
#define SRS_PROTOCOL_ST_HPP#include chw_adapt.h
#include string#include srs_kernel_error.hpp
#include srs_protocol_io.hpp// Wrap for coroutine.
typedef void* srs_netfd_t;
typedef void* srs_thread_t;
typedef void* srs_cond_t;
typedef void* srs_mutex_t;// Initialize ST, requires epoll for linux.
extern srs_error_t srs_st_init();
// Destroy ST, free resources for asan detecting.
extern void srs_st_destroy(void);// Close the netfd, and close the underlayer fd.
// remark when close, user must ensure io completed.
extern void srs_close_stfd(srs_netfd_t stfd);// Set the FD_CLOEXEC of FD.
extern srs_error_t srs_fd_closeexec(int fd);// Set the SO_REUSEADDR of fd.
extern srs_error_t srs_fd_reuseaddr(int fd);// Set the SO_REUSEPORT of fd.
extern srs_error_t srs_fd_reuseport(int fd);// Set the SO_KEEPALIVE of fd.
extern srs_error_t srs_fd_keepalive(int fd);// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
extern void srs_thread_exit(void* retval);
extern int srs_thread_join(srs_thread_t thread, void **retvalp);
extern void srs_thread_interrupt(srs_thread_t thread);
extern void srs_thread_yield();// For utest to mock the thread create.
typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size);
extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;// For client, to open socket and connect to server.
// param tm The timeout in srs_utime_t.
extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);// For server, listen at TCP endpoint.
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);// For server, listen at UDP endpoint.
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
extern int srs_cond_destroy(srs_cond_t cond);
extern int srs_cond_wait(srs_cond_t cond);
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
extern int srs_cond_signal(srs_cond_t cond);
extern int srs_cond_broadcast(srs_cond_t cond);extern srs_mutex_t srs_mutex_new();
extern int srs_mutex_destroy(srs_mutex_t mutex);
extern int srs_mutex_lock(srs_mutex_t mutex);
extern int srs_mutex_unlock(srs_mutex_t mutex);extern int srs_key_create(int* keyp, void (*destructor)(void*));
extern int srs_thread_setspecific(int key, void* value);
extern int srs_thread_setspecific2(srs_thread_t thread, int key, void* value);
extern void* srs_thread_getspecific(int key);extern int srs_netfd_fileno(srs_netfd_t stfd);extern int srs_usleep(srs_utime_t usecs);extern srs_netfd_t srs_netfd_open_socket(int osfd);
extern srs_netfd_t srs_netfd_open(int osfd);extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);extern bool srs_is_never_timeout(srs_utime_t tm);// The mutex locker.
#define SrsLocker(instance) \impl__SrsLocker _SRS_free_##instance(instance)class impl__SrsLocker
{
private:srs_mutex_t* lock;
public:impl__SrsLocker(srs_mutex_t* l) {lock l;int r0 srs_mutex_lock(*lock);srs_assert(!r0);}virtual ~impl__SrsLocker() {int r0 srs_mutex_unlock(*lock);srs_assert(!r0);}
};// the socket provides TCP socket over st,
// that is, the sync socket mechanism.
class SrsStSocket : public ISrsProtocolReadWriter
{
private:// The recv/send timeout in srs_utime_t.// remark Use SRS_UTIME_NO_TIMEOUT for never timeout.srs_utime_t rtm;srs_utime_t stm;// The recv/send data in bytesint64_t rbytes;int64_t sbytes;// The underlayer st fd.srs_netfd_t stfd_;
public:SrsStSocket();SrsStSocket(srs_netfd_t fd);virtual ~SrsStSocket();
private:void init(srs_netfd_t fd);
public:virtual void set_recv_timeout(srs_utime_t tm);virtual srs_utime_t get_recv_timeout();virtual void set_send_timeout(srs_utime_t tm);virtual srs_utime_t get_send_timeout();virtual int64_t get_recv_bytes();virtual int64_t get_send_bytes();
public:// param nread, the actual read bytes, ignore if NULL.virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);// param nwrite, the actual write bytes, ignore if NULL.virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};// The client to connect to server over TCP.
// User must never reuse the client when close it.
// Usage:
// SrsTcpClient client(127.0.0.1, 1935, 9 * SRS_UTIME_SECONDS);
// client.connect();
// client.write(Hello world!, 12, NULL);
// client.read(buf, 4096, NULL);
// remark User can directly free the object, which will close the fd.
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:srs_netfd_t stfd_;SrsStSocket* io;
private:std::string host;int port;// The timeout in srs_utime_t.srs_utime_t timeout;
public:// Constructor.// param h the ip or hostname of server.// param p the port to connect to.// param tm the timeout in srs_utime_t.SrsTcpClient(std::string h, int p, srs_utime_t tm);virtual ~SrsTcpClient();
public:// Connect to server over TCP.// remark We will close the exists connection before do connect.virtual srs_error_t connect();
// Interface ISrsProtocolReadWriter
public:virtual void set_recv_timeout(srs_utime_t tm);virtual srs_utime_t get_recv_timeout();virtual void set_send_timeout(srs_utime_t tm);virtual srs_utime_t get_send_timeout();virtual int64_t get_recv_bytes();virtual int64_t get_send_bytes();virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};#endifsrs_protocol_st.cpp
#include srs_protocol_st.hpp#include st.h
#include fcntl.h
#include sys/socket.h
#include netdb.h
#include string.h
using namespace std;
#include srs_kernel_error.hpp// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512#ifdef __linux__
#include sys/epoll.hbool srs_st_epoll_is_supported(void)
{struct epoll_event ev;ev.events EPOLLIN;ev.data.ptr NULL;/* Guaranteed to fail */epoll_ctl(-1, EPOLL_CTL_ADD, -1, ev);return (errno ! ENOSYS);
}
#endifsrs_error_t srs_st_init()
{
#ifdef __linux__// check epoll, some old linux donot support epoll.if (!srs_st_epoll_is_supported()) {return srs_error_new(ERROR_THREAD, linux epoll disabled);}
#endif// Select the best event system available on the OS. In Linux this is// epoll(). On BSD it will be kqueue.
#if defined(SRS_CYGWIN64)if (st_set_eventsys(ST_EVENTSYS_SELECT) -1) {return srs_error_new(ERROR_ST_SET_SELECT, st enable st failed, current is %s, st_get_eventsys_name());}
#elseif (st_set_eventsys(ST_EVENTSYS_ALT) -1) {return srs_error_new(ERROR_THREAD, st enable st failed, current is %s, st_get_eventsys_name());}
#endif// Before ST init, we might have already initialized the background cid.
// SrsContextId cid _srs_context-get_id();
// if (cid.empty()) {
// cid _srs_context-generate_id();
// }int r0 0;if((r0 st_init()) ! 0){return srs_error_new(ERROR_THREAD, st initialize failed, r0%d, r0);}// Switch to the background cid.
// _srs_context-set_id(cid);printf(st_init success, use %s, st_get_eventsys_name());return srs_success;
}void srs_st_destroy(void)
{st_destroy();
}void srs_close_stfd(srs_netfd_t stfd)
{if (stfd) {// we must ensure the close is ok.int r0 st_netfd_close((st_netfd_t)stfd);if (r0) {// By _st_epoll_fd_close or _st_kq_fd_closeif (errno EBUSY) srs_assert(!r0);// By closeif (errno EBADF) srs_assert(!r0);if (errno EINTR) srs_assert(!r0);if (errno EIO) srs_assert(!r0);// Otherssrs_assert(!r0);}stfd NULL;}
}srs_error_t srs_fd_closeexec(int fd)
{int flags fcntl(fd, F_GETFD);flags | FD_CLOEXEC;if (fcntl(fd, F_SETFD, flags) -1) {return srs_error_new(ERROR_THREAD, FD_CLOEXEC fd%d, fd);}return srs_success;
}srs_error_t srs_fd_reuseaddr(int fd)
{int v 1;if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, v, sizeof(int)) -1) {return srs_error_new(ERROR_THREAD, SO_REUSEADDR fd%d, fd);}return srs_success;
}srs_error_t srs_fd_reuseport(int fd)
{
#if defined(SO_REUSEPORT)int v 1;if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, v, sizeof(int)) -1) {printf(SO_REUSEPORT failed for fd%d, fd);}
#else#warning SO_REUSEPORT is not supported by your OSsrs_warn(SO_REUSEPORT is not supported util Linux kernel 3.9);
#endifreturn srs_success;
}srs_error_t srs_fd_keepalive(int fd)
{
#ifdef SO_KEEPALIVEint v 1;if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, v, sizeof(int)) -1) {return srs_error_new(ERROR_THREAD, SO_KEEPALIVE fd%d, fd);}
#endifreturn srs_success;
}srs_thread_t srs_thread_self()
{return (srs_thread_t)st_thread_self();
}void srs_thread_exit(void* retval)
{st_thread_exit(retval);
}int srs_thread_join(srs_thread_t thread, void **retvalp)
{return st_thread_join((st_thread_t)thread, retvalp);
}void srs_thread_interrupt(srs_thread_t thread)
{st_thread_interrupt((st_thread_t)thread);
}void srs_thread_yield()
{st_thread_yield();
}_ST_THREAD_CREATE_PFN _pfn_st_thread_create (_ST_THREAD_CREATE_PFN)st_thread_create;srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{st_utime_t timeout ST_UTIME_NO_TIMEOUT;if (tm ! SRS_UTIME_NO_TIMEOUT) {timeout tm;}*pstfd NULL;srs_netfd_t stfd NULL;char sport[8];int r0 snprintf(sport, sizeof(sport), %d, port);srs_assert(r0 0 r0 (int)sizeof(sport));addrinfo hints;memset(hints, 0, sizeof(hints));hints.ai_family AF_UNSPEC;hints.ai_socktype SOCK_STREAM;addrinfo* r NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(server.c_str(), sport, (const addrinfo*)hints, r)) {return srs_error_new(ERROR_THREAD, get address info);}int sock socket(r-ai_family, r-ai_socktype, r-ai_protocol);if(sock -1){return srs_error_new(ERROR_SOCKET_CREATE, create socket);}srs_assert(!stfd);stfd st_netfd_open_socket(sock);if(stfd NULL){::close(sock);return srs_error_new(ERROR_THREAD, open socket);}if (st_connect((st_netfd_t)stfd, r-ai_addr, r-ai_addrlen, timeout) -1){srs_close_stfd(stfd);return srs_error_new(ERROR_THREAD, connect to %s:%d, server.c_str(), port);}*pstfd stfd;return srs_success;
}srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{srs_error_t err srs_success;// Detect alive for TCP connection.// see https://github.com/ossrs/srs/issues/1044if ((err srs_fd_keepalive(fd)) ! srs_success) {return srs_error_wrap(err, set keepalive);}if ((err srs_fd_closeexec(fd)) ! srs_success) {return srs_error_wrap(err, set closeexec);}if ((err srs_fd_reuseaddr(fd)) ! srs_success) {return srs_error_wrap(err, set reuseaddr);}if ((err srs_fd_reuseport(fd)) ! srs_success) {return srs_error_wrap(err, set reuseport);}if (::bind(fd, r-ai_addr, r-ai_addrlen) -1) {return srs_error_new(ERROR_THREAD, bind);}if (::listen(fd, SERVER_LISTEN_BACKLOG) -1) {return srs_error_new(ERROR_THREAD, listen);}if ((*pfd srs_netfd_open_socket(fd)) NULL){return srs_error_new(ERROR_THREAD, st open);}return err;
}srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd)
{srs_error_t err srs_success;char sport[8];int r0 snprintf(sport, sizeof(sport), %d, port);srs_assert(r0 0 r0 (int)sizeof(sport));addrinfo hints;memset(hints, 0, sizeof(hints));hints.ai_family AF_UNSPEC;hints.ai_socktype SOCK_STREAM;hints.ai_flags AI_NUMERICHOST;addrinfo* r NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)hints, r)) {return srs_error_new(ERROR_THREAD, getaddrinfo hints(%d,%d,%d),hints.ai_family, hints.ai_socktype, hints.ai_flags);}int fd 0;if ((fd socket(r-ai_family, r-ai_socktype, r-ai_protocol)) -1) {return srs_error_new(ERROR_SOCKET_CREATE, socket domain%d, type%d, protocol%d,r-ai_family, r-ai_socktype, r-ai_protocol);}if ((err do_srs_tcp_listen(fd, r, pfd)) ! srs_success) {::close(fd);return srs_error_wrap(err, fd%d, fd);}return err;
}srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{srs_error_t err srs_success;if ((err srs_fd_closeexec(fd)) ! srs_success) {return srs_error_wrap(err, set closeexec);}if ((err srs_fd_reuseaddr(fd)) ! srs_success) {return srs_error_wrap(err, set reuseaddr);}if ((err srs_fd_reuseport(fd)) ! srs_success) {return srs_error_wrap(err, set reuseport);}if (::bind(fd, r-ai_addr, r-ai_addrlen) -1) {return srs_error_new(ERROR_THREAD, bind);}if ((*pfd srs_netfd_open_socket(fd)) NULL){return srs_error_new(ERROR_THREAD, st open);}return err;
}srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd)
{srs_error_t err srs_success;char sport[8];int r0 snprintf(sport, sizeof(sport), %d, port);srs_assert(r0 0 r0 (int)sizeof(sport));addrinfo hints;memset(hints, 0, sizeof(hints));hints.ai_family AF_UNSPEC;hints.ai_socktype SOCK_DGRAM;hints.ai_flags AI_NUMERICHOST;addrinfo* r NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)hints, r)) {return srs_error_new(ERROR_THREAD, getaddrinfo hints(%d,%d,%d),hints.ai_family, hints.ai_socktype, hints.ai_flags);}int fd 0;if ((fd socket(r-ai_family, r-ai_socktype, r-ai_protocol)) -1) {return srs_error_new(ERROR_SOCKET_CREATE, socket domain%d, type%d, protocol%d,r-ai_family, r-ai_socktype, r-ai_protocol);}if ((err do_srs_udp_listen(fd, r, pfd)) ! srs_success) {::close(fd);return srs_error_wrap(err, fd%d, fd);}return err;
}srs_cond_t srs_cond_new()
{return (srs_cond_t)st_cond_new();
}int srs_cond_destroy(srs_cond_t cond)
{return st_cond_destroy((st_cond_t)cond);
}int srs_cond_wait(srs_cond_t cond)
{return st_cond_wait((st_cond_t)cond);
}int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout)
{return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout);
}int srs_cond_signal(srs_cond_t cond)
{return st_cond_signal((st_cond_t)cond);
}int srs_cond_broadcast(srs_cond_t cond)
{return st_cond_broadcast((st_cond_t)cond);
}srs_mutex_t srs_mutex_new()
{return (srs_mutex_t)st_mutex_new();
}int srs_mutex_destroy(srs_mutex_t mutex)
{if (!mutex) {return 0;}return st_mutex_destroy((st_mutex_t)mutex);
}int srs_mutex_lock(srs_mutex_t mutex)
{return st_mutex_lock((st_mutex_t)mutex);
}int srs_mutex_unlock(srs_mutex_t mutex)
{return st_mutex_unlock((st_mutex_t)mutex);
}int srs_key_create(int *keyp, void (*destructor)(void *))
{return st_key_create(keyp, destructor);
}int srs_thread_setspecific(int key, void *value)
{return st_thread_setspecific(key, value);
}void *srs_thread_getspecific(int key)
{return st_thread_getspecific(key);
}int srs_thread_setspecific2(srs_thread_t thread, int key, void* value)
{return st_thread_setspecific2((st_thread_t)thread, key, value);
}int srs_netfd_fileno(srs_netfd_t stfd)
{return st_netfd_fileno((st_netfd_t)stfd);
}int srs_usleep(srs_utime_t usecs)
{return st_usleep((st_utime_t)usecs);
}srs_netfd_t srs_netfd_open_socket(int osfd)
{return (srs_netfd_t)st_netfd_open_socket(osfd);
}srs_netfd_t srs_netfd_open(int osfd)
{return (srs_netfd_t)st_netfd_open(osfd);
}int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout)
{return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
}int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, int tolen, srs_utime_t timeout)
{return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout);
}int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout)
{return st_recvmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout)
{return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);
}ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
{return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}bool srs_is_never_timeout(srs_utime_t tm)
{return tm SRS_UTIME_NO_TIMEOUT;
}SrsStSocket::SrsStSocket()
{init(NULL);
}SrsStSocket::SrsStSocket(srs_netfd_t fd)
{init(fd);
}SrsStSocket::~SrsStSocket()
{
}void SrsStSocket::init(srs_netfd_t fd)
{stfd_ fd;stm rtm SRS_UTIME_NO_TIMEOUT;rbytes sbytes 0;
}void SrsStSocket::set_recv_timeout(srs_utime_t tm)
{rtm tm;
}srs_utime_t SrsStSocket::get_recv_timeout()
{return rtm;
}void SrsStSocket::set_send_timeout(srs_utime_t tm)
{stm tm;
}srs_utime_t SrsStSocket::get_send_timeout()
{return stm;
}int64_t SrsStSocket::get_recv_bytes()
{return rbytes;
}int64_t SrsStSocket::get_send_bytes()
{return sbytes;
}srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{srs_error_t err srs_success;srs_assert(stfd_);ssize_t nb_read;if (rtm SRS_UTIME_NO_TIMEOUT) {nb_read st_read((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_read st_read((st_netfd_t)stfd_, buf, size, rtm);}if (nread) {*nread nb_read;}// On success a non-negative integer indicating the number of bytes actually read is returned// (a value of 0 means the network connection is closed or end of file is reached).// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_read 0) {if (nb_read 0 errno ETIME) {return srs_error_new(ERROR_THREAD, timeout %d ms, srsu2msi(rtm));}if (nb_read 0) {errno ECONNRESET;}return srs_error_new(ERROR_THREAD, read);}rbytes nb_read;return err;
}srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{srs_error_t err srs_success;srs_assert(stfd_);ssize_t nb_read;if (rtm SRS_UTIME_NO_TIMEOUT) {nb_read st_read_fully((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_read st_read_fully((st_netfd_t)stfd_, buf, size, rtm);}if (nread) {*nread nb_read;}// On success a non-negative integer indicating the number of bytes actually read is returned// (a value less than nbyte means the network connection is closed or end of file is reached)// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_read ! (ssize_t)size) {if (nb_read 0 errno ETIME) {return srs_error_new(ERROR_THREAD, timeout %d ms, srsu2msi(rtm));}if (nb_read 0) {errno ECONNRESET;}return srs_error_new(ERROR_THREAD, read fully, size%d, nn%d, size, nb_read);}rbytes nb_read;return err;
}srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{srs_error_t err srs_success;srs_assert(stfd_);ssize_t nb_write;if (stm SRS_UTIME_NO_TIMEOUT) {nb_write st_write((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_write st_write((st_netfd_t)stfd_, buf, size, stm);}if (nwrite) {*nwrite nb_write;}// On success a non-negative integer equal to nbyte is returned.// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_write 0) {if (nb_write 0 errno ETIME) {return srs_error_new(ERROR_THREAD, write timeout %d ms, srsu2msi(stm));}return srs_error_new(ERROR_THREAD, write);}sbytes nb_write;return err;
}srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{srs_error_t err srs_success;srs_assert(stfd_);ssize_t nb_write;if (stm SRS_UTIME_NO_TIMEOUT) {nb_write st_writev((st_netfd_t)stfd_, iov, iov_size, ST_UTIME_NO_TIMEOUT);} else {nb_write st_writev((st_netfd_t)stfd_, iov, iov_size, stm);}if (nwrite) {*nwrite nb_write;}// On success a non-negative integer equal to nbyte is returned.// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_write 0) {if (nb_write 0 errno ETIME) {return srs_error_new(ERROR_THREAD, writev timeout %d ms, srsu2msi(stm));}return srs_error_new(ERROR_THREAD, writev);}sbytes nb_write;return err;
}SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
{stfd_ NULL;io new SrsStSocket();host h;port p;timeout tm;
}SrsTcpClient::~SrsTcpClient()
{srs_freep(io);srs_close_stfd(stfd_);
}srs_error_t SrsTcpClient::connect()
{srs_error_t err srs_success;srs_netfd_t stfd NULL;if ((err srs_tcp_connect(host, port, timeout, stfd)) ! srs_success) {return srs_error_wrap(err, tcp: connect %s:%d to%dms, host.c_str(), port, srsu2msi(timeout));}// TODO: FIMXE: The timeout set on io need to be set to new object.srs_freep(io);io new SrsStSocket(stfd);srs_close_stfd(stfd_);stfd_ stfd;return err;
}void SrsTcpClient::set_recv_timeout(srs_utime_t tm)
{io-set_recv_timeout(tm);
}srs_utime_t SrsTcpClient::get_recv_timeout()
{return io-get_recv_timeout();
}void SrsTcpClient::set_send_timeout(srs_utime_t tm)
{io-set_send_timeout(tm);
}srs_utime_t SrsTcpClient::get_send_timeout()
{return io-get_send_timeout();
}int64_t SrsTcpClient::get_recv_bytes()
{return io-get_recv_bytes();
}int64_t SrsTcpClient::get_send_bytes()
{return io-get_send_bytes();
}srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{return io-read(buf, size, nread);
}srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{return io-read_fully(buf, size, nread);
}srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{return io-write(buf, size, nwrite);
}srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{return io-writev(iov, iov_size, nwrite);
}
ST协程库测试
#include srs_app_st.hpp
#include st.h//0.在ST_TEST对象里启动一个协程
class ST_TEST : public ISrsCoroutineHandler{
public:ST_TEST(){trd NULL;srs_freep(trd);trd new SrsSTCoroutine(ST_TEST-, this,9527ID);//2.new一个ST对象}srs_error_t startST(){srs_error_t err srs_success;if ((err trd-start()) ! srs_success) {//3.start()创建协程return srs_error_wrap(err, start timer);}printf(\nST.startST\n);return err;}
public: virtual srs_error_t cycle() {//4.协程处理函数回调cycle()srs_error_t err srs_success;printf(ST.cycle\n);return err;}
private:SrsCoroutine* trd;
};srs_st_init();//1.初始化STST_TEST *pST_TEST new ST_TEST;pST_TEST-startST();st_thread_exit(NULL);打印
st_init success, use epoll
ST.startST
ST.cycleSrsAutoFree测试
class SrsAutoFree_TEST{
public:SrsAutoFree_TEST(){printf(SrsAutoFree_TEST\n);}~SrsAutoFree_TEST(){printf(~SrsAutoFree_TEST\n);}
};void testAutoFree()
{SrsAutoFree_TEST *pSrsAutoFree_TEST nullptr;pSrsAutoFree_TEST new SrsAutoFree_TEST;SrsAutoFree(SrsAutoFree_TEST,pSrsAutoFree_TEST);
}testAutoFree();打印
SrsAutoFree_TEST
~SrsAutoFree_TEST