麦克船长的 OpenRTMFP/Cumulus 原理、源码及实践 2:CumulusServer 源码启动流程分析

本文目录

本文对 CumulusServer 的启动流程进行了详细的源码解读,其中还包括 CumulusServer 如何处理命令行的各个输入选项、各项命令、如何 dump logs、载入配置、处理日志。首先要知道的是,OpenRTMFP/Cumulus 中使用到的库有 Poco、OpenSSL 和 Lua。

一、Cumulus 启动源码分析

1、main.cpp 中的 main() 函数

入口在 main.cpp 中:

int main(int argc, char* argv[]) {

先检查内存泄露,不过目前这个开发中的项目还没有实现这个功能,只是个空函数:

    DetectMemoryLeak();

然后会创建一个匿名的 CumulusServer 对象,并调用其 run() 函数,该函数由 CumulusServerServerApplication 中继承而来,而 ServerApplication 由是从 Application 继承而来的。CumulusServer 对象调用 run() 函数,实际是 ServerApplicationrun() 函数,ServerApplicationrun() 函数则是调用 Application 的函数,而该 run() 函数则是先调用 initialize() 函数,然后调用 main() 函数,然后调用 uninitialize() 函数。如果 initialize() 函数被调用时抛出异常,则不会执行 main() 函数,但仍然会执行 uninitialize() 函数:

    // Runs the application by performing additional initializations
    // and calling the main() method.
    return CumulusServer().run(argc, argv);
}

2、main.cpp 中的 CumulusServer() 构造函数

CumulusServer 的构造函数定义为:

CumulusServer(): _helpRequested(false), // 显示帮助信息
                 _pCirrus(NULL),
                 _middle(false),
                 _isInteractive(true),
                 _pLogFile(NULL) {
}

3、main.cpp 中的 CumulusServerinitialize() 成员函数

在执行 main() 函数之前,CumulusServer 会启动 initialize() 函数,传入的参数就是 CumulusServer 自己,可以猜到 Poco::Util::Applicationrun 方法中,调用该函数时的参数是 this

void initialize(Application& self) {

调用父函数 ServerApplication 的初始化函数:

ServerApplication::initialize(self);

再继续下面的源码分析之前,先要知道,根据 Poco::Util::ApplicationApplication 类有一些内置的配置属性,如下:

  • application.path: 可执行文件的绝对路径;
  • application.name: 可执行文件的文件名(含扩展名);
  • application.baseName: 可执行文件的文件名(不含扩展名)
  • application.dir: 可执行文件的所在目录;
  • application.configDir: 配置文件所在目录;

所以下面就读取了可执行文件的所在目录,其中第二个参数表示默认值(即当前目录):

    string dir = config().getString("application.dir", "./");

然后读取配置文件,目录为上一句所得到的 dir,文件名(不含扩展名)为 application.basename 内置配置属性值,其默认值为 CumulusServer,然后加上点和扩展名 .ini

    loadConfiguration(dir
        + config().getString("application.baseName", "CumulusServer")
        + ".ini");

这样就加载完配置了。然后查看当前的进程是从命令行运行的(命令行是交互的,所以是 interactive),还是以 daemon 方式运行的,这个函数是ServerApplication的一个成员函数:

    _isInteractive = isInteractive();

然后获取表示日志文件所在目录的字符串,其中 logs.directory 是外置配置属性(配置文件中),其默认值为上面获取到的可执行文件路径(一般为当前路径)与 logs 的组合,即一般为当前目录下的 logs 目录:

    string logDir(config().getString("logs.directory", dir + "logs"));

创建日志文件目录:

    File(logDir).createDirectory();

日志文件绝对路径,logs 为默认的日志文件名(不含扩展名的部分),扩展名用0:

    _logPath = logDir + "/" + config().getString("logs.name", "log") + ".";
    _pLogFile = new File(_logPath + "0");

用日志流打开日志文件(方式为追加写入):

    _logStream.open(_pLogFile->path(), ios::in | ios::ate);

Logs 是一个方法类(其中的 public 函数都是静态的),SetLogger 的作用就是将Logs中的似有静态成员设置为某个 Cumulus::Logger 对象(由于 CumulusServer 继承了 Cumulus::Logger)。

    Logs::SetLogger(*this);
}

4、main.cpp 中的 CumulusServermain() 成员函数

OpenRTMFP/Cumulus 是一个基于 Poco::Util::Application 的服务端应用(准确的说是基于 Poco::Util::ServerApplication 的服务端应用)。如果没有特殊的启动要求,可以调用 Poco/Application.h 中定义的宏 POCO_APP_MAIN 来完成初始化、日志和启动(该宏会根据不同的平台启用不同的 main() 函数)。

run() 函数在调用完 initialize() 函数后,会调用 CumulusServer 中的 main() 函数,该 main() 函数的定义在 main.cpp 中:

int main(const std::vector<std::string>& args) {

首先看是否是要求帮助信息,displayHelp 是借助 Poco::Util::HelpFormatter 实现的,CumulusServer 的构造函数会在调用时将 _helpRequested 设置为 false

    if (_helpRequested) {
        displayHelp();
    }

如果不是,则进入启动状态,首先创建一个 RTMFPServerParams 对象 params,用来存储 OpenRTMFP/CumulusServer 的基本配置信息。

    else {
        try {
            RTMFPServerParams params;

params 存储 CumulusServer 的端口号和 CumulusEdge 的端口号:

            params.port = config().getInt("port", params.port);
            UInt16 edgesPort = config().getInt("edges.port",
                RTMFP_DEFAULT_PORT+1);
            if(config().getBool("edges.activated",false)) {
                if(edgesPort==0)
                    WARN("edges.port must have a positive value if \
                          edges.activated is true. Server edges is \
                          disactivated.");
                params.edgesPort=edgesPort;
            }

_pCirrusSocketAddress 的成员,是封装IP地址和端口号的对象。

            params.pCirrus = _pCirrus;
            params.middle = _middle;

UDB 所使用的缓冲区大小:

            params.udpBufferSize = config().getInt("udpBufferSize",
                params.udpBufferSize);
            params.keepAliveServer = config().getInt(
                "keepAliveServer",params.keepAliveServer);
            params.keepAlivePeer = config().getInt("keepAlivePeer",
                params.keepAlivePeer);

失败前 CumulusEdge 的尝试次数:

            params.edgesAttemptsBeforeFallback = config().getInt(
                "edges.attemptsBeforeFallback",
                params.edgesAttemptsBeforeFallback);
            Server server(config().getString("application.dir","./"),
                *this,config());
            server.start(params);

waitForTerminationRequest() 函数是 main() 函数中必须调用的,意为等待终止运行的操作请求。

            // wait for CTRL-C or kill
            waitForTerminationRequest();

一旦接收到终止操作的请求,就会执行下面这句,用以退出 OpenRTMFP/Cumulus 的运行:

            // Stop the server
            server.stop();

catch 一些可能产生的异常:

        } catch(Exception& ex) {
            FATAL("Configuration problem : %s",ex.displayText().c_str());
        } catch (exception& ex) {
            FATAL("CumulusServer : %s",ex.what());
        } catch (...) {
            FATAL("CumulusServer unknown error");
        }
    }

OpenRTMFP/CumulusServer 停止运行:

    return Application::EXIT_OK;
}

5、CumulusServerServerApplication 的子类

ServerApplication 对其子类有如下要求:

  • Subsystems must be registered in the constructor.
  • All non-trivial initializations must be made in the initialize()` method.
  • At the end of the main() method, waitForTerminationRequest() should be called.

6、ServerApplicationApplication 的子类

Application 对其子类的要求是,如下这些成员函数必须被覆盖:

  • initialize() (the one-argument, protected variant):上一篇已介绍过。
  • uninitialize():下面会介绍,Application 的 run() 函数会在调用 main() 函数后调用 uninitialize() 函数。
  • reinitialize()
  • defineOptions():定义命令行启动选项。
  • handleOption():响应相应的命令行选项。
  • main()

7、反初始化

CumulusServer 是继承 ServerApplication 的,ServerApplication 是继承 Application 的。Applicationrun() 函数会先调用 initialize(),然后调用 main(),最后调用 uninitialize。最后这个反初始化过程,在 CumulusServer 就是直接调用父类的 uninitialize 函数。

void uninitialize() {
    ServerApplication::uninitialize();
}

二、CumulusServer 各项交互功能的源码解读

1、命令行选项设定

CumulusServer 的命令行选项有:log(l)dump(d)cirrus(c)middle(m)help(h)

void defineOptions(OptionSet& options) {
    ServerApplication::defineOptions(options);

设定日志级别(0 - 8,默认是 6,表示 info 级别)。

    options.addOption(
        Option("log", "l", "Log level argument, must be beetween 0 and 8 : \
            nothing, fatal, critic, error, warn, note, info, debug, trace. \
            Default value is 6 (info), all logs until info level are displayed.")
                .required(false)
                .argument("level")
                .repeatable(false));

其他一些选项:

    options.addOption(
        Option("dump", "d", "Enables packet traces in logs. Optional arguments \
            are 'middle' or 'all' respectively to displays just middle packet \
            process or all packet process. If no argument is given, just outside \
            packet process will be dumped.",false,"middle|all",false)
                .repeatable(false));
    options.addOption(
        Option("cirrus", "c", "Cirrus address to activate a 'man-in-the-middle' \
            developer mode in bypassing flash packets to the official cirrus \
            server of your choice, it's a instable mode to help Cumulus developers, \
            \"p2p.rtmfp.net:10000\" for example. By adding the 'dump' argument, \
            you will able to display Cirrus/Flash packet exchange in your logs \
            (see 'dump' argument).",false,"address",true)
                .repeatable(false));
    options.addOption(
        Option("middle", "m","Enables a 'man-in-the-middle' developer mode \
            between two peers. It's a instable mode to help Cumulus developers. \
            By adding the 'dump' argument, you will able to display Flash/Flash \
            packet exchange in your logs (see 'dump' argument).")
                .repeatable(false));

显示帮助信息的选项:

    options.addOption(
        Option("help", "h", "Displays help information about command-line usage.")
            .required(false)
            .repeatable(false));
}

OptionSetPoco::Util::OptionSet,调用addOption可以向其中增加选项Option。其中required和repeatable表示:

Sets whether the option is required (flag == true) or optional (flag == false). Returns true if the option can be specified more than once, or false if at most once. 当需要显示帮助信息时,调用如下函数:

void displayHelp() {
    HelpFormatter helpFormatter(options());
    helpFormatter.setCommand(commandName());
    helpFormatter.setUsage("OPTIONS");
    helpFormatter.setHeader("CumulusServer, open source RTMFP server");
    helpFormatter.format(cout);
}
  • setCommand(): Sets the command name.
  • setUsage(): Sets the usage string.
  • setHeader(): Sets the header string.
  • format(): Writes the formatted help text to the given stream.

2、处理命令行选项

参数是选项名和选项值。

void handleOption(const std::string& name, const std::string& value) {
    ServerApplication::handleOption(name, value);

如果选项是帮助:

    if (name == "help")
        _helpRequested = true;

如果是cirrus,即该服务的 IP 和端口号,Poco::URI 中有协议名(Scheme)、IP 地址(Host)、端口号(Port)、查询串(Query)等等。

    else if (name == "cirrus") {
        try {
            URI uri("rtmfp://"+value);
            _pCirrus = new SocketAddress(uri.getHost(),uri.getPort());
            NOTE("Mode 'man in the middle' : the exchange will bypass to '%s'",value.c_str());
        } catch(Exception& ex) {
            ERROR("Mode 'man in the middle' error : %s",ex.message().c_str());
        }

如果选项是dump日志:

    } else if (name == "dump") {
        if(value == "all")
            Logs::SetDump(Logs::ALL);
        else if(value == "middle")
            Logs::SetDump(Logs::MIDDLE);
        else
            Logs::SetDump(Logs::EXTERNAL);

如果选项是middle:

    } else if (name == "middle")
        _middle = true;

如果选项是log,表示设定日志级别:

    else if (name == "log")
        Logs::SetLevel(atoi(value.c_str()));
}

6、Dump logs

先加一个作用域锁,然后再向日志流写数据。

void dumpHandler(const UInt8* data,UInt32 size) {
    ScopedLock<FastMutex> lock(_logMutex);
    cout.write((const char*)data, size);
    _logStream.write((const char*)data,size);
    manageLogFile();
}

调用 manageLogFile,主要做一些日志大小超出限制的处理。

void manageLogFile() {

先判断是否超过日志文件的大小上线,LOG_SIZE是1000000字节(即约 1 MB)。

    if (_pLogFile->getSize() > LOG_SIZE) {
        _logStream.close();
        int num = 10;

打开新日志文件:

        File file(_logPath + "10");

如果该文件已经存在,则先删除:

        if (file.exists())
            file.remove();

        while (--num >= 0) {
            file = _logPath + NumberFormatter::format(num);
            if (file.exists())
                file.renameTo(_logPath + NumberFormatter::format(num + 1));
        }
        _logStream.open(_pLogFile->path(), ios::in | ios::ate);
    }   
}

3、停止运行

CumulusServer 继承了 ApplicationKiller,该类中有纯虚函数 kill() 需要被实现,于是有:

void kill() {
    terminate();
}

ApplicationKiller 的定义在 ApplicationKiller.h 中,如下:

class ApplicationKiller {
public:
    ApplicationKiller(){}
    virtual ~ApplicationKiller(){}
 
    virtual void kill()=0;
};

4、载入配置

在initialize()函数中调用,上一篇已提到过。

void loadConfiguration(const string& path) {
    try {
        ServerApplication::loadConfiguration(path);
    } catch(...) {
    }
}

5、处理日志

void logHandler(Thread::TID threadId,
                const std::string& threadName,
                Priority priority,
                const char *filePath,
                long line, 
                const char *text) {

作用域锁:

    ScopedLock<FastMutex> lock(_logMutex);
 
    Path path(filePath);
    string file;
    if (path.getExtension() == "lua")
        file += path.directory(path.depth()-1) + "/";

如果是命令行交互模式(即不是 daemon 模式):

    if (_isInteractive)
        printf("%s  %s[%ld] %s\n",
            g_logPriorities[priority - 1],
            (file + path.getBaseName()).c_str(),
            line,
            text);

向日志流输出一句日志:

    _logStream << DateTimeFormatter::format(LocalDateTime(),"%d/%m %H:%M:%S.%c  ")
            << g_logPriorities[priority-1] 
            << '\t' << threadName 
            << '(' << threadId << ")\t"
            << (file + path.getFileName()) 
            << '[' << line << "]  " 
            << text << std::endl;
 
    _logStream.flush();

日志文件的善后处理(主要处理文件大小限制可能产生的问题):

    manageLogFile();
}

三、main.cppmain() 函数源码分析

1、main.cpp 中的 main() 函数中的 server

main.cpp 中真正启动的是 server,它继承自 Cumulus::RTMFPServer,而 Cumulus::RTMFPServer 又继承自 Cumulus::StartableCumulus::GatewayCumulus::Handler。而 Cumulus::Startable 继承自 Poco::Runnable,所以其是一个可以运行的线程。在 OpenRTMFP/CumulusServer 中,这是主线程。

Server server(config().getString("application.dir", "./"), *this, config());
server.start(params);

这是 CumulusServer/Server.h 中定义的,其构造函数的原型为:

Server(const std::string& root,
       ApplicationKiller& applicationKiller,
       const Poco::Util::AbstractConfiguration& configurations);

个参数含义如下:

The Path Root for the Server Application Killer for Termanting the Server Application Server Configuration

距离来说,在我的 Worksapce 中:

root/Users/michael/Development/workspace/eclipse/OpenRTMFP-Cumulus/Debug/ 构造函数的初始化列表极长:

Server::Server(const std::string& root,
               ApplicationKiller& applicationKiller,
               const Util::AbstractConfiguration& configurations) 
    : _blacklist(root + "blacklist", *this),
      _applicationKiller(applicationKiller),
      _hasOnRealTime(true),
      _pService(NULL),
      luaMail(_pState=Script::CreateState(),
              configurations.getString("smtp.host","localhost"),
              configurations.getInt("smtp.port",SMTPSession::SMTP_PORT),
              configurations.getInt("smtp.timeout",60)) {

下面调用 Poco::File 创建目录:

    File((string&)WWWPath = root + "www").createDirectory();

因为 roor/Users/michael/Development/workspace/eclipse/OpenRTMFP-Cumulus/Debug/ 目录,所以 WWWPath 就是 /Users/michael/Development/workspace/eclipse/OpenRTMFP-Cumulus/Debug/www 目录。然后初始化 GlobalTable,这个 GlobalTable 是和 Lua 有关的东东,这里暂不细说,先知道与 Lua 相关就好。

    Service::InitGlobalTable(_pState);

下面就涉及到了 Lua script 了:

    SCRIPT_BEGIN(_pState)
        SCRIPT_CREATE_PERSISTENT_OBJECT(Invoker,LUAInvoker,*this)
        readNextConfig(_pState,configurations,"");
        lua_setglobal(_pState,"cumulus.configs");
    SCRIPT_END
}

其中 SCRIPT_BEGINSCRIPT_CREATE_PERSISTENT_OBJECT和SCRIPT_END 都是宏,其定义在 Script.h 文件中,如下:

#define SCRIPT_BEGIN(STATE) \
    if (lua_State* __pState = STATE) { \
        const char* __error=NULL;
 
#define SCRIPT_CREATE_PERSISTENT_OBJECT(TYPE,LUATYPE,OBJ) \
    Script::WritePersistentObject<TYPE,LUATYPE>(__pState,OBJ); \
    lua_pop(__pState,1);
 
#define SCRIPT_END }

SCRIPT_BEGIN和SCRIPT_END 经常用到,当与 Lua 相关的操作出现时,都会以这两个宏作为开头和结尾。

2、main.cppmain() 函数的 server.start()

void RTMFPServer::start(RTMFPServerParams& params) {

如果 OpenRTMFP/CumulusServer 正在运行,则返回并终止启动。

    if(running()) {
        ERROR("RTMFPServer server is yet running, call stop method before");
        return;
    }

设定端口号,如果端口号为 0,则返回并终止启动。

    _port = params.port;
    if (_port == 0) {
        ERROR("RTMFPServer port must have a positive value");
        return;
    }

设定 OpenRTMFP/CumulusEdge 的端口号,如果其端口号与 OpenRTMFP/CumulusSever 端口号相同,则返回并终止启动:

    _edgesPort = params.edgesPort;
    if(_port == _edgesPort) {
        ERROR("RTMFPServer port must different than RTMFPServer edges.port");
        return;
    }

Cirrus:

    _freqManage = 2000000; // 2 sec by default
    if(params.pCirrus) {
        _pCirrus = new Target(*params.pCirrus);
        _freqManage = 0; // no waiting, direct process in the middle case!
        NOTE("RTMFPServer started in man-in-the-middle mode with server %s \
             (unstable debug mode)", _pCirrus->address.toString().c_str());
    }

middle:

    _middle = params.middle;
    if(_middle)
        NOTE("RTMFPServer started in man-in-the-middle mode between peers \
              (unstable debug mode)");

UDP Buffer:

    (UInt32&)udpBufferSize = 
        params.udpBufferSize==0 ? 
            _socket.getReceiveBufferSize() : params.udpBufferSize;
 
    _socket.setReceiveBufferSize(udpBufferSize);
    _socket.setSendBufferSize(udpBufferSize);
    _edgesSocket.setReceiveBufferSize(udpBufferSize);
    _edgesSocket.setSendBufferSize(udpBufferSize);
 
    DEBUG("Socket buffer receving/sending size = %u/%u",
        udpBufferSize,
        udpBufferSize);
 
    (UInt32&)keepAliveServer = 
        params.keepAliveServer < 5 ? 5000 : params.keepAliveServer * 1000;
    (UInt32&)keepAlivePeer = 
        params.keepAlivePeer < 5 ? 5000 : params.keepAlivePeer * 1000;
    (UInt8&)edgesAttemptsBeforeFallback = params.edgesAttemptsBeforeFallback;
 
    setPriority(params.threadPriority);

启动线程,进入循环运行:

    Startable::start();
}

上句具体的源码实现为:

void Startable::start() {
    if (running())
        return;

如果在运行则返回并终止启动。然后加一个局部锁。

    ScopedLock<FastMutex> lock(_mutex);

如果不得不join()到主线程中,那就join()吧

    if(_haveToJoin) {
        _thread.join();
        _haveToJoin=false;
    }

然后就运行这个线程吧:

    _terminate = false;
    _thread.start(*this);
    _haveToJoin = true;
}

3、回顾一下整个启动流程

到此我们先回顾一下启动过程:

main.cpp 的启动入口 main() 函数开始,创建 Server 对象并启动(调用 start() 函数)。Server::start() 中调用其父类(RTMFPServer)的父类(Startable)的方法 Startable::start() 开启线程。 调用 Startable::start() 函数后,开启线城时传入的参数为 *this,所以就会运行 Startable::run()

4、RTMFPServer::prerun()Startable::prerun()RTMFPServer::run(...) 函数源码

Startable::run() 调用 Startable::prerun() 函数,但这个函数被 RTMFPServer 覆盖,所以会运行 RTMFPServer::prerun(),其源码如下:

bool RTMFPServer::prerun() {
    NOTE("RTMFP server starts on %u port",_port);

如果CumulusEdge:

    if (_edgesPort>0)
        NOTE("RTMFP edges server starts on %u port",_edgesPort);
 
    bool result = true;
    try {
        onStart();

运行线程:

        result = Startable::prerun();

处理异常:

    } catch(Exception& ex) {
        FATAL("RTMFPServer : %s",ex.displayText().c_str());
    } catch (exception& ex) {
        FATAL("RTMFPServer : %s",ex.what());
    } catch (...) {
        FATAL("RTMFPServer unknown error");
    }

如果跳出了,则终止运行:

    onStop();
 
    NOTE("RTMFP server stops");
    return result;
}

该函数内部又会调用父类的 Startable::prerun() 函数,该函数调用:

virtual void Startable::run(const volatile bool& terminate) = 0;

它是一个纯虚函数,由 RTMFPServer 实现。

Startable::prerun() 会调用 void run(const volatile bool& terminate) 方法,该方法被 RTMFPServer 覆盖了。

bool Startable::prerun() {
    run(_terminate);
    return !_terminate;
}

RTMFPServer 覆盖 Startablerun(const volatile bool &terminate) 方法。

void RTMFPServer::run(const volatile bool& terminate) {
    ...
}