一个工业级、跨平台、轻量级的tcp网络服务框架:gevent

编程

作为公司的公共产品,经常有这样的需求:就是新建一个本地服务,产品线作为客户端通过 tcp 接入本地服务,来获取想要的业务能力。

与印象中动辄处理成千上万连接的 tcp 网络服务不同,这个本地服务是跑在客户机器上的,Win32 上作为开机自启动的 windows 服务运行;

Linux 上作为 daemon 在后台运行。总的说来就是用于接收几个产品进程的连接,因此轻量化是其最重要的要求,在这个基础上要能兼顾跨平台就可以了。

其实主要就是 windows,再兼顾一点儿 linux。

 

考察了几个现有的开源网络框架,从 ACE 、boost::asio 到 libevent,都有不尽于人意的地方:

a) ACE:太重,只是想要一个网络框架,结果它扒拉扒拉一堆全提供了,不用还不行;

b) boost::asio:太复杂,牵扯到 boost 库,并且引入了一堆 c++ 模板,需要高版本 c++ 编译器支持;

c) libevent:这个看着不错,当时确实用这个做底层封装了一版,结果发版后发现一个比较致命的问题,导致在防火墙设置比较严格的机器上初始化失败,这个后面我会详细提到。

 

其它的就更不用说了,之前也粗略看过陈硕的 muddo,总的感觉吧,它是基于其它开源框架不足地方改进的一个库,有相当可取的地方,但是这个改进的方向也主要是解决更大并发、更多连接,不是我的痛点,所以没有继续深入研究。

 

好了,与其在不同开源框架之间纠结,不如自己动手写一个。

反正我的场景比较固定,不用像它们那样面面俱,我给自己罗列了一些这个框架需要支持基本的功能:

1)同步写、异步读;

2)可同时监听多路事件,基于 1)这里只针对异步 READ 事件(包含连接进入、连接断开),写数据是同步的,因而不需要处理异步 WRITE 事件;

3)要有设置一次性和周期性定时器的能力 (业务决定的);

4)不需要处理信号 (windows 上也没信号这一说,linux 自己搞搞 sigaction 就好啦);

……

 

虽然这个框架未来只会运行在用户的单机上,但是我不希望它一出生就带有性能缺陷,所以性能平平的 select 没能进入我的法眼,我决定给它装上最强大的心脏:

Windows 平台: iocp

Linux 平台:epoll

 

ok,从需求到底层技术路线,貌似都讲清楚了,依照 libevent 我给它取名为 gevent,下面我们从代码级别看下这个框架是怎么简化 tcp 服务搭建这类工作的。

首先看一下这个 tcp 服务框架的 sample:

svc_handler.h

 1 #include "EventBase.h"

2 #include "EventHandler.h"

3

4class GMyEventBase : public GEventBase

5{

6public:

7 GEventHandler* create_handler ();

8};

9

10

11class svc_handler : public GJsonEventHandler

12{

13public:

14virtual ~svc_handler () {}

15virtualvoid on_read_msg (Json::Value const& val);

16 };

epoll_svc.cpp

 1 #include <stdio.h>

2 #include "svc_handler.h"

3 #include <signal.h>

4

5GMyEventBase g_base;

6 GEventHandler* GMyEventBase::create_handler ()

7{

8returnnew svc_handler;

9}

10

11void sig_int (int signo)

12{

13 printf ("%d caught

", signo);

14 g_base.exit (1);

15 printf ("exit ok

");

16}

17

18int main (int argc, char *argv[])

19{

20if (argc < 2)

21 {

22 printf ("usage: epoll_svc port

");

23return -1;

24 }

25

26 unsigned short port = atoi (argv[1]);

27

28#ifndef WIN32

29struct sigaction act;

30 act.sa_handler = sig_int;

31 sigemptyset(&act.sa_mask);

32 act.sa_flags = SA_RESTART;

33if (sigaction (SIGINT, &act, NULL) < 0)

34 {

35 printf ("install SIGINT failed, errno %d

", errno);

36return -1;

37 }

38else

39 printf ("install SIGINT ok

");

40#endif

41

42// to test small message block

43if (g_base.init (/*8, 10*/) < 0)

44return -1;

45

46 printf ("init ok

");

47do

48 {

49if (!g_base.listen (port))

50 {

51 g_base.exit (0);

52 printf ("exit ok

");

53break;

54 }

55

56 printf ("listen ok

");

57 g_base.run ();

58 printf ("run over

");

59 } while (0);

60

61 g_base.fini ();

62 printf ("fini ok

");

63

64 g_base.cleanup ();

65 printf ("cleanup ok

");

66return0;

67 }

 

这个服务的核心是 GMyEventBase 类,它使用了框架中的 GEventBase 类,从后者派生而来,

只改写了一个 create_handler 接口来提供我们的事件处理对象 svc_handler,它是从框架中的 GEventHandler 派生而来,

svc_handler 只改写了一个 on_read_msg 来处理 Json 格式的消息输入。

 

程序的运行就是分别调用 GMyEventBase(实际上是GEventBase)  的 init / listen / run / fini / cleaup 方法。

而与业务相关的代码,都在 svc_handler 中处理:

svc_handler.cpp

 1 #include "svc_handler.h"

2

3void svc_handler::on_read_msg (Json::Value const& val)

4{

5int key = val["key"].asInt ();

6 std::string data = val["data"].asString ();

7 printf ("got %d:%s

", key, data.c_str ());

8

9 Json::Value root;

10 Json::FastWriter writer;

11 root["key"] = key + 1;

12 root["data"] = data;

13

14int ret = 0;

15 std::string resp = writer.write(root);

16 resp = resp.substr (0, resp.length () - 1); // trim tailing

17if ((ret = send (resp)) <= 0)

18 printf ("send response failed, errno %d

", errno);

19else

20 printf ("response %d

", ret);

21 }

 

它期待 Json 格式的数据,并且有两个字段 key(int) 与 data (string),接收数据后将 key 增 1 后返回给客户端。

再来看下客户端 sample:

clt_handler.h

 1 #include "EventBaseAR.h"

2 #include "EventHandler.h"

3

4class GMyEventBase : public GEventBaseWithAutoReconnect

5{

6public:

7 GEventHandler* create_handler ();

8};

9

10

11class clt_handler : public GJsonEventHandler

12{

13public:

14virtual ~clt_handler () {}

15#ifdef TEST_TIMER

16virtualbool on_timeout (GEV_PER_TIMER_DATA *gptd);

17#endif

18virtualvoid on_read_msg (Json::Value const& val);

19 };

 

epoll_clt.cpp

  1 #include <stdio.h>

2 #include "clt_handler.h"

3 #include <signal.h>

4

5//#define TEST_READ

6//#define TEST_CONN

7//#define TEST_TIMER

8

9GMyEventBase g_base;

10 GEventHandler* GMyEventBase::create_handler ()

11{

12returnnew clt_handler;

13}

14

15

16int sig_caught = 0;

17void sig_int (int signo)

18{

19 sig_caught = 1;

20 printf ("%d caught

", signo);

21 g_base.exit (0);

22 printf ("exit ok

");

23}

24

25void do_read (GEventHandler *eh, int total)

26{

27char buf[1024] = { 0 };

28int ret = 0, n = 0, key = 0, err = 0;

29char *ptr = nullptr;

30while ((total == 0 || n++ < total) && fgets (buf, sizeof(buf), stdin) != NULL)

31 {

32// skip

33 buf[strlen(buf) - 1] = 0;

34//n = sscanf (buf, "%d", &key);

35 key = strtol (buf, &ptr, 10);

36if (ptr == nullptr)

37 {

38 printf ("format: int string

");

39continue;

40 }

41

42 Json::Value root;

43 Json::FastWriter writer;

44 root["key"] = key;

45// skip space internal

46 root["data"] = *ptr == "" ? ptr + 1 : ptr;

47

48 std::string req = writer.write (root);

49 req = req.substr (0, req.length () - 1); // trim tailing

50if ((ret = eh->send (req)) <= 0)

51 {

52 err = 1;

53 printf ("send %d failed, errno %d

", req.length (), errno);

54break;

55 }

56else

57 printf ("send %d

", ret);

58 }

59

60if (total == 0)

61 printf ("reach end

");

62

63if (!err)

64 {

65 eh->disconnect ();

66 printf ("call disconnect to notify server

");

67 }

68

69// wait receiving thread

70//sleep (3);

71// if use press Ctrl+D, need to notify peer our break

72}

73

74#ifdef TEST_TIMER

75void test_timer (unsigned short port, int period_msec, int times)

76{

77int n = 0;

78 GEventHandler *eh = nullptr;

79

80do

81 {

82 eh = g_base.connect (port);

83if (eh == nullptr)

84break;

85

86 printf ("connect ok

");

87void* t = g_base.timeout (1000, period_msec, eh, NULL);

88if (t == NULL)

89 {

90 printf ("timeout failed

");

91break;

92 }

93else

94 printf ("set timer %p ok

", t);

95

96// to wait timer

97do

98 {

99 sleep (400);

100 printf ("wake up from sleep

");

101 } while (!sig_caught && n++ < times);

102

103 g_base.cancel_timer (t);

104 } while (0);

105}

106#endif

107

108#ifdef TEST_CONN

109void test_conn (unsigned short port, int per_read, int times)

110{

111# ifdef WIN32

112 srand (GetCurrentProcessId());

113 # else

114 srand (getpid ());

115# endif

116int n = 0, elapse = 0;

117 clt_handler *eh = nullptr;

118

119do

120 {

121 eh = (clt_handler *)g_base.connect (port);

122if (eh == nullptr)

123break;

124

125 printf ("connect ok

");

126

127 do_read (eh, per_read);

128# ifdef WIN32

129 elapse = rand() % 1000;

130 Sleep(elapse);

131 printf ("running %d ms

", elapse);

132 # else

133 elapse = rand () % 1000000;

134 usleep (elapse);

135 printf ("running %.3f ms

", elapse/1000.0);

136# endif

137

138 } while (!sig_caught && n++ < times);

139}

140#endif

141

142#ifdef TEST_READ

143void test_read (unsigned short port, int total)

144{

145int n = 0;

146 GEventHandler *eh = nullptr;

147

148do

149 {

150 eh = g_base.connect (port);

151if (eh == nullptr)

152break;

153

154 printf ("connect ok

");

155 do_read (eh, total);

156 } while (0);

157}

158#endif

159

160int main (int argc, char *argv[])

161{

162if (argc < 2)

163 {

164 printf ("usage: epoll_clt port

");

165return -1;

166 }

167

168 unsigned short port = atoi (argv[1]);

169

170#ifndef WIN32

171struct sigaction act;

172 act.sa_handler = sig_int;

173 sigemptyset(&act.sa_mask);

174// to ensure read be breaked by SIGINT

175 act.sa_flags = 0; //SA_RESTART;

176if (sigaction (SIGINT, &act, NULL) < 0)

177 {

178 printf ("install SIGINT failed, errno %d

", errno);

179return -1;

180 }

181#endif

182

183if (g_base.init (2) < 0)

184return -1;

185

186 printf ("init ok

");

187

188#if defined(TEST_READ)

189 test_read (port, 0); // 0 means infinite loop until user break

190#elif defined(TEST_CONN)

191 test_conn (port, 10, 100);

192#elif defined (TEST_TIMER)

193 test_timer (port, 10, 1000);

194#else

195 # error please define TEST_XXX macro to do something!

196#endif

197

198if (!sig_caught)

199 {

200// Ctrl + D ?

201 g_base.exit (0);

202 printf ("exit ok

");

203 }

204else

205 printf ("has caught Ctrl+C

");

206

207 g_base.fini ();

208 printf ("fini ok

");

209

210 g_base.cleanup ();

211 printf ("cleanup ok

");

212return0;

213 }

 

客户端同样使用了 GEventBase 的派生类 GMyEventBase 来作为事件循环的核心,所不同的是(注意并非之前例子里的那个类,虽然同名),它提供了 clt_handler 来处理自己的业务代码。

另外为了提供连接中断后自动向服务重连的功能,这里 GMyEventBase 派生自 GEventBase 类的子类 GEventBaseWithAutoReconnect (位于 EventBaseAR.h/cpp 中)。

程序的运行是分别调用 GEventBase 的 init / connect / fini / cleaup 方法以及 GEventHandler 的 send / disconnect 来测试读写与连接。

定义宏 TEST_READ 用来测试读写;定义宏 TEST_CONN 可以测试连接的通断及读写;定义宏 TEST_TIMER 来测试周期性定时器及读写。它们是互斥的。

clt_handler 主要用来异步接收服务端的回送数据并打印:

clt_handler.cpp

 1 #include "clt_handler.h"

2

3#ifdef TEST_TIMER

4externvoid do_read (clt_handler *, int);

5bool clt_handler::on_timeout (GEV_PER_TIMER_DATA *gptd)

6{

7 printf ("time out ! id %p, due %d, period %d

", gptd, gptd->due_msec, gptd->period_msec);

8 do_read ((clt_handler *)gptd->user_arg, 1);

9returntrue;

10}

11#endif

12

13void clt_handler::on_read_msg (Json::Value const& val)

14{

15int key = val["key"].asInt ();

16 std::string data = val["data"].asString ();

17 printf ("got %d:%s

", key, data.c_str ());

18 }

 

这个测试程序可以通过在控制台手工输入数据来驱动,也可以通过测试数据文件来驱动,下面的 awk 脚本用来制造符合格式的测试数据:

epoll_gen.awk

 1 #! /bin/awk -f

2BEGIN {

3 WORDNUM = 1000

4for (i = 1; i <= WORDNUM; i++) {

5 printf("%d %s

", randint(WORDNUM), randword(20))

6 }

7}

8

9 # randint(n): return a random integer number which is >= 1 and <= n

10function randint(n) {

11 return int(n *rand()) + 1

12}

13

14 # randlet(): return a random letter, which maybe upper, lower or number.

15function randlet() {

16 return substr("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", randint(62), 1)

17}

18

19# randword(LEN): return a rand word with a length of LEN

20function randword(LEN) {

21 randw=""

22for( j = 1; j <= LEN; j++) {

23 randw=randw randlet()

24 }

25 return randw

26 }

 

生成的测试文件格式如下:

238 s0jKlYkEjwE4q3nNJugF

568 0cgNaSgDpP3VS45x3Wum

996 kRF6SgmIReFmrNBcCecj

398 QHQqCrB5fC61hao1BV2x

945 XZ6KLtA4jZTEnhcAugAM

619 WE95NU7FnsYar4wz279j

549 oVCTmD516yvmtuJB2NG3

840 NDAaL5vpzp8DQX0rLRiV

378 jONIm64AN6UVc7uTLIIR

251 EqSBOhc40pKXhCbCu8Ey

 

整个工程编译的话就是一个 CMakeLists 文件,可以通过 cmake 生成对应的 Makefile 或 VS solution 来编译代码:

CMakeLists.txt

 1 cmake_minimum_required(VERSION 3.0)

2project(epoll_svc)

3 include_directories(../core ../include)

4 set(CMAKE_CXX_FLAGS "-std=c++11 -pthread -g -Wall ${CMAKE_CXX_FLAGS}")

5 link_directories(${PROJECT_SOURCE_DIR}/../lib)

6 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)

7

8 add_executable (epoll_svc epoll_svc.cpp svc_handler.cpp ../core/EventBase.cpp ../core/EventHandler.cpp ../core/log.cpp)

9IF (WIN32)

10target_link_libraries(epoll_svc jsoncpp ws2_32)

11ELSE ()

12target_link_libraries(epoll_svc jsoncpp rt)

13ENDIF ()

14

15 add_executable (epoll_clt epoll_clt.cpp clt_handler.cpp ../core/EventBase.cpp ../core/EventBaseAR.cpp ../core/EventHandler.cpp ../core/log.cpp)

16 target_compile_definitions(epoll_clt PUBLIC -D TEST_READ)

17IF (WIN32)

18target_link_libraries(epoll_clt jsoncpp ws2_32)

19ELSE ()

20target_link_libraries(epoll_clt jsoncpp rt)

21ENDIF ()

22

23 add_executable (epoll_local epoll_local.cpp)

24IF (WIN32)

25target_link_libraries(epoll_local jsoncpp ws2_32)

26ELSE ()

27target_link_libraries(epoll_local jsoncpp rt)

28 ENDIF ()

 

 这个项目包含三个编译目标,分别是 epoll_svc 、epoll_clt 与 epoll_local,其中前两个可以跨平台编译,后一个只能在 Linux 平台编译,用来验证 epoll 的一些特性。

编译完成后,首先运行服务端:

>./epoll_svc 1025 

 然后运行客户端:

>./epoll_clt 1025 < demo

测试多个客户端同时连接,可以使用下面的脚本:

epoll_start.sh

1 #! /bin/bash

2 # /bin/sh -> /bin/dash, do not recognize our for loop

3

4for((i=0;i<10;i=i+1))

5do

6 ./epoll_clt 1025 < demo &

7echo"start $i"

8done

 

可以同时启动 10 个客户端。

通过 Ctrl+C 退出服务端;通过 Ctrl+C 或 Ctrl+D 退出单个客户端;

通过下面的脚本来停止多个客户端与服务端:

epoll_stop.sh

1 #! /bin/sh

2 pkill -INT epoll_clt

3sleep1

4 pkill -INT epoll_svc

 


 

框架的用法介绍完之后,再简单游览一下这个库的各层级对外接口。

EventBase.h

  1#pragma once

2

3

4 #include "EventHandler.h"

5 #include <string>

6 #include <map>

7 #include <mutex>

8 #include <condition_variable>

9 #include "thread_group.hpp"

10

11#define GEV_MAX_BUF_SIZE 65536

12

13class GEventBase : public IEventBase

14{

15public:

16 GEventBase();

17 ~GEventBase();

18

19#ifdef WIN32

20virtual HANDLE iocp () const;

21#else

22virtualint epfd () const;

23#endif

24virtualbool post_timer(GEV_PER_TIMER_DATA *gptd);

25virtual GEventHandler* create_handler() = 0;

26

27// thr_num :

28// =0 - no default thread pool, user provide thread and call run

29// <0 - use max(|thr_num|, processer_num)

30// >0 - use thr_num

31bool init(int thr_num = -8, int blksize = GEV_MAX_BUF_SIZE

32#ifndef WIN32

33 , int timer_sig = SIGUSR1

34#endif

35 );

36

37bool listen(unsigned short port, unsigned short backup = 10);

38 GEventHandler* connect(unsigned short port, GEventHandler* exist_handler = NULL);

39// PARAM

40// due_msec: first timeout milliseconds

41// period_msec: later periodically milliseconds

42// arg: user provied argument

43// exist_handler: reuse the timer handler

44//

45// RETURN

46// NULL: failed

47void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler);

48bool cancel_timer(void* tid);

49void fini();

50void run();

51void exit(int extra_notify = 0);

52void cleanup();

53

54protected:

55#ifdef WIN32

56bool do_accept(GEV_PER_IO_DATA *gpid);

57bool do_recv(GEV_PER_HANDLE_DATA *gphd, GEV_PER_IO_DATA *gpid);

58void do_error(GEV_PER_HANDLE_DATA *gphd);

59

60int init_socket();

61bool issue_accept();

62bool issue_read(GEV_PER_HANDLE_DATA *gphd);

63bool post_completion(DWORD bytes, ULONG_PTR key, LPOVERLAPPED ol);

64

65#else

66bool do_accept(int fd);

67bool do_recv(conn_key_t key);

68void do_error(conn_key_t key);

69

70bool init_pipe();

71void close_pipe();

72bool post_notify (char ch, void* ptr = nullptr);

73void promote_leader (std::unique_lock<std::mutex> &guard);

74

75 GEventHandler* find_by_key (conn_key_t key, bool erase);

76 GEventHandler* find_by_fd (int fd, conn_key_t &key, bool erase);

77

78# ifdef HAS_SIGTHR

79void sig_proc ();

80# endif

81#endif

82

83bool do_timeout(GEV_PER_TIMER_DATA *gptd);

84

85virtualbool on_accept(GEV_PER_HANDLE_DATA *gphd);

86virtualbool on_read(GEventHandler *h, GEV_PER_IO_DATA *gpid);

87virtualvoid on_error(GEventHandler *h);

88virtualbool on_timeout (GEV_PER_TIMER_DATA *gptd);

89

90

91protected:

92volatilebool m_running = false;

93int m_thrnum = 0;

94int m_blksize = GEV_MAX_BUF_SIZE;

95 std::thread_group m_grp;

96 SOCKET m_listener = INVALID_SOCKET;

97

98 std::mutex m_mutex; // protect m_map

99 std::mutex m_tlock; // protect m_tmap

100// timer_t may conflict when new timer created after old timer closed

101//std::map <timer_t, GEventHandler *> m_tmap;

102 std::map <GEV_PER_TIMER_DATA*, GEventHandler *> m_tmap;

103

104#ifdef WIN32

105 LPFN_ACCEPTEX m_acceptex = nullptr;

106 LPFN_GETACCEPTEXSOCKADDRS m_getacceptexsockaddrs = nullptr;

107 HANDLE m_iocp = NULL;

108 HANDLE m_timerque = NULL;

109

110 std::map<GEV_PER_HANDLE_DATA*, GEventHandler*> m_map;

111#else

112int m_ep = -1;

113int m_pp[2];

114int m_tsig = 0; // signal number for timer

115

116 std::mutex m_lock; // protect epoll

117 pthread_t m_leader = -1;

118 std::map<conn_key_t, GEventHandler*> m_map;

119# ifdef HAS_SIGTHR

120// special thread only cares about signal

121 std::thread *m_sigthr = nullptr;

122# endif

123#endif

124 };

 

  • init,它在底层启动 thr_num 个线程来跑 run 方法;每次 IO 的块缓冲区大小由 blksize 指定;它内部还创建了对应的 iocp 或 epoll 对象,便于之后加入 socket 句柄进行处理。
  • exit,它通知线程池中的所有线程退出等待,windows 上是通过 PostQueuedCompletionStatus,Linux 上是通过在自建的一个 pipe 上写数据以触发 epoll 退出(这个 pipe 在 init 中创建并加入 epoll);
  • fini,它在所有工作线程退出后,关闭之前创建的对象,清理事件循环用到的资源;
  • cleanup,它清理之前建立的 fd-handler 映射,清理遗留的处理器并释放资源;
  • run,它是线程池运行函数,windows 上是通过 GetQueuedCompletionStatus 在 iocp 上等待;在 linux 上是通过 epoll_wait 在 epoll 上等待事件。当有事件产生后,根据事件类型,分别调用 do_accept / on_accept、do_recv / on_read、do_error / on_error 回调来分派事件;
  • listen,创建侦听 socket 并加入到 iocp 或 epoll 中;
  • connect,连接到远程服务并将成功连接的 socket 加入到 iocp 或  epoll 中;
  • timeout,设置定时器事件,windows 上是通过 CreateTimerQueueTimer 实现定时器超时;linux 则是通过 timer_create 实现的,都是系统现成的东西,只不过在系统定时器到期后,给对应的 iocp 或 epoll 对象发送了一个通知而已,在 linux 上这个通知机制是上面提到过的 pipe 来实现的,因而有一定延迟,不能指定精度太小的定时器;
  • cancel_timer,取消之前设置的定时器。

 

然后看下 GEventHandler 提供的回调接口,应用可以从它派生并完成业务相关代码:

EventHandler.h

  1#pragma once

2 #include "platform.h"

3

4#ifdef WIN32

5// must ensure <winsock2.h> precedes <widnows.h> included, to prevent winsock2.h conflict with winsock.h

6 # include <WinSock2.h>

7 # include <Windows.h>

8 # include <mswsock.h> // for LPFN_ACCEPTEX & LPFN_GETACCEPTEXSOCKADDRS later in EventBase.h

9#else

10 # include <unistd.h> // for close

11 # include <sys/socket.h>

12 # include <sys/epoll.h>

13 # include <sys/time.h>

14 # include <netinet/in.h> // for struct sockaddr_in

15 # include <arpa/inet.h> // for inet_addr/inet_ntoa

16 # include <string.h> // for memset/memcpy

17 # include <signal.h>

18#endif

19

20 #include <mutex>

21 #include "jsoncpp/json.h"

22

23

24class GEventHandler;

25struct GEV_PER_TIMER_DATA;

26class IEventBase

27{

28public:

29#ifdef WIN32

30virtual HANDLE iocp () const = 0;

31#else

32virtualint epfd () const = 0;

33#endif

34

35virtualvoid* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler) = 0;

36virtualbool cancel_timer(void* tid) = 0;

37virtualbool post_timer(GEV_PER_TIMER_DATA *gptd) = 0;

38};

39

40

41#ifdef WIN32

42enum GEV_IOCP_OP

43{

44 OP_TIMEOUT = 1,

45 OP_ACCEPT,

46 OP_RECV,

47};

48#else

49// the purpose of this key is to distinguish different connections with same fd !

50// (when connection break and re-established soon, fd may not change but port will change)

51struct conn_key_t

52{

53int fd;

54 unsigned short lport;

55 unsigned short rport;

56

57 conn_key_t (int f, unsigned short l, unsigned short r);

58booloperator< (struct conn_key_t const& rhs) const;

59};

60#endif

61

62

63struct GEV_PER_HANDLE_DATA

64{

65 SOCKET so;

66 SOCKADDR_IN laddr;

67 SOCKADDR_IN raddr;

68

69#ifndef WIN32

70 conn_key_t key () const;

71#endif

72

73 GEV_PER_HANDLE_DATA(SOCKET s, SOCKADDR_IN *l, SOCKADDR_IN *r);

74virtual ~GEV_PER_HANDLE_DATA();

75};

76

77struct GEV_PER_IO_DATA

78{

79 SOCKET so;

80#ifdef WIN32

81 GEV_IOCP_OP op;

82 OVERLAPPED ol;

83 WSABUF wsa; // wsa.len is buffer length

84 DWORD bytes; // after compeleted, bytes trasnfered

85#else

86char *buf;

87int len;

88#endif

89

90 GEV_PER_IO_DATA(

91#ifdef WIN32

92 GEV_IOCP_OP o,

93#endif

94 SOCKET s, int l);

95virtual ~GEV_PER_IO_DATA();

96};

97

98struct GEV_PER_TIMER_DATA

99#ifdef WIN32

100 : public GEV_PER_IO_DATA

101#endif

102{

103 IEventBase *base;

104int due_msec;

105int period_msec;

106void *user_arg;

107bool cancelled;

108#ifdef WIN32

109 HANDLE timerque;

110 HANDLE timer;

111#else

112 timer_t timer;

113#endif

114

115 GEV_PER_TIMER_DATA(IEventBase *base, int due, int period, void *arg

116#ifdef WIN32

117 , HANDLE tq);

118#else

119 , timer_t tid);

120#endif

121

122virtual ~GEV_PER_TIMER_DATA();

123void cancel ();

124};

125

126class GEventHandler

127{

128public:

129 GEventHandler();

130virtual ~GEventHandler();

131

132 GEV_PER_HANDLE_DATA* gphd();

133 GEV_PER_TIMER_DATA* gptd();

134bool connected();

135void disconnect();

136void clear();

137 SOCKET fd();

138

139int send(charconst* buf, int len);

140int send(std::stringconst& str);

141

142virtualbool reuse();

143virtualbool auto_reconnect();

144virtualvoid arg(void *param) = 0;

145virtualvoid reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);

146virtualbool on_read(GEV_PER_IO_DATA *gpid) = 0;

147virtualvoid on_error(GEV_PER_HANDLE_DATA *gphd);

148// note when on_timeout called, handler"s base may cleared by cancel_timer, use gptd->base instead if it is not null.

149virtualbool on_timeout(GEV_PER_TIMER_DATA *gptd) = 0;

150virtualvoid cleanup(bool terminal);

151void close(bool terminal);

152

153protected:

154 GEV_PER_HANDLE_DATA *m_gphd = nullptr;

155 GEV_PER_TIMER_DATA *m_gptd = nullptr;

156 IEventBase *m_base = nullptr;

157// us so instead of m_gphd,

158// as the later one may destroyed during using..

159 SOCKET m_so;

160};

161

162// a common handler to process json protocol.

163class GJsonEventHandler : public GEventHandler

164{

165public:

166//virtual void on_read();

167virtualvoid arg(void *param);

168virtualvoid reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);

169virtualbool on_read(GEV_PER_IO_DATA *gpid);

170virtualvoid on_read_msg(Json::Value const& root) = 0;

171virtualbool on_timeout(GEV_PER_TIMER_DATA *gptd);

172virtualvoid cleanup(bool terminal);

173

174protected:

175// protect m_stub to prevent multi-entry

176#ifdef HAS_ET

177 std::mutex m_mutex;

178#endif

179

180 std::string m_stub;

181 };

 

这里主要有两个类,GEventHandler 处理通用的基于流的数据;GJsonEventHandler 处理基于 json 格式的数据。

前者需要重写 on_read 方法来处理块数据;后者需要重写 on_read_msg 方法来处理 json 数据。

目前 json 的解析是通过 jsoncpp 库完成的,这个库本身是跨平台的(本 git 库仅提供 64 位 Linux 静态链接库及 VS2013 的 32 位 Release 版本 Windows 静态库)。

svc_handler 与 clt_handler  均从 GJsonEventHandler 派生。

如果有新的流格式需要处理 ,只需要从 GEventHandler 类派生新的处理类即可。

 

除了读取连接上的数据,还有其它一些重要的回调接口,列明如下:

  • on_read,连接上有数据到达;
  • on_error,连接断开;
  • on_tmeout,定时器事件;
  • ……

如果有新的事件需要处理 ,也可以在这里扩展。

最后看下 GEventBaseWithAutoReconnect 提供的与自动重连相关的接口:

EventBaseAR.h

 1#pragma once

2

3

4 #include "EventBase.h"

5 #include <thread>

6

7#define GEV_RECONNECT_TIMEOUT 2 // seconds

8#define GEV_MAX_RECONNECT_TIMEOUT 256 // seconds

9

10class GEventBaseWithAutoReconnect : public GEventBase

11{

12public:

13 GEventBaseWithAutoReconnect(int reconn_min = GEV_RECONNECT_TIMEOUT, int reconn_max = GEV_MAX_RECONNECT_TIMEOUT);

14 ~GEventBaseWithAutoReconnect();

15

16bool do_connect(unsigned short port, void *arg);

17 GEventHandler* connector();

18

19protected:

20virtualvoid on_error(GEventHandler *h);

21virtualbool on_timeout(GEV_PER_TIMER_DATA *gptd);

22

23virtualvoid on_connect_break();

24virtualbool on_connected(GEventHandler *app);

25

26protected:

27void do_reconnect(void *arg);

28

29protected:

30 unsigned short m_port;

31 GEventHandler* m_app;

32 GEventHandler* m_htimer;

33void* m_timer;

34int m_reconn_min;

35int m_reconn_max;

36int m_reconn_curr;

37 };

 

其实比较简单,只比 GEventBase 类多了一个  do_connect 方法,来扩展 connect 不能自动重连的问题。

底层的话,是通过定时器来实现指数后退重连算法的。

 


 

后记

这个框架已经应用到我司的公共产品中,并为数个 tcp 服务提供底层支撑,经过百万级别用户机器验证,运行稳定性还是可以的,所以当得起“工业级”这三个字。

 

前面在说到开源库的选型时还留了一个口子没有交待,这里一并说下。

其实最早的重构版本是使用 libevent 来实现的,但是发现它在 windows 上使用的是低效的 select,

而且为了增加、删除句柄,它又使用了一种 self-pipe-trick 的技巧,简单说来的就是下面的代码序列:

listen (listen_fd, 1); 

……

connect (connect_fd, &addr, size);

……

accept_fd = accept (listen_fd, &addr, &size);

 

在缺乏 pipe 调用的 win32 环境制造了一个 socket 自连接,从而进行一些通知。

这一步是必要的,如果不能成功连接就会导致整个 libevent 初始化失败,从而运行不起来。

不巧的是,在一些 windows 机器上(约占用户总量 10%),由于防火墙设置严格,上述 listen 与 connect 调用可以成功,

但是 accept 会失败返回,从而导致整个服务退出 (防火墙会严格禁止不在白名单上侦听的端口的连接)。

对于已知端口,可以通过在防火墙上设置白名单来避免,但是对于这种随机 listen 的端口,真的是太难了,基本无解。

 

回头考察了一下 asio,windows 上使用的是 iocp,自然没有这个自连接;

ACE 有多种实现可供选择,如果使用  ACE_Select_Reactor / ACE_TP_Reactor 是会有这个自连接,

但是你可以选择其它实现,如基于 WaitForMultipleEvents 的 ACE_WFMO_Reactor(最大只支持 62 个句柄,放弃),

或基于 iocp 的 ACE_Proactor (前摄式,与反应式在编程上稍有不同,更接近于 asio)就没有这个自连接。

 

再说的深一点,其实公司最早的网络库使用的就是基于 boost 的 asio,大量的使用了 c++ 模板,

有时候产生了一些崩溃,但是根据 dump 完全无法定位崩溃点(各种冗长的模板展开名称),

导致了一些顽固的已知 bug 一起找不到崩溃点而无法解决(虽然量不大),所以才有了要去重新选型网络库以及后来这一系列的东西。

 

本来一开始我是想用 ACE 的,因为我读过这个库的源码,对里面所有的东西都非常熟悉,

但是看看 ACE 小 5 MB 的 dll 尺寸,还是放弃了(产品本身安装包也就这么大吧),

对于一个公司底层的公共组件,被各种产品携带,需要严格控制“体重”

(后来听说 ACE 按功能拆分了代码模块,你只需要选自己依赖的部分即可,不过我还没有试过)。

 

使用这个库代替之前的 boost::asio 后,我还有一个意外收获,就是编译出来的 dll 尺寸明显小了很多,700 K -> 500 K 的样子,看来所谓模板膨胀是真有其事……

 

最后奉上 gevent 的 github 链接,欢迎有相同需求的小伙伴前来“复刻” :

https://github.com/goodpaperman/gevent

 

原文链接:https://www.cnblogs.com/goodcitizen/archive/2020/05/31/12349909.html

以上是 一个工业级、跨平台、轻量级的tcp网络服务框架:gevent 的全部内容, 来源链接: utcz.com/z/516986.html

回到顶部