Starting with 1.56, boost/asio
provides asio::spawn()
to work with coroutines. Just paste the sample code here, with minor modifications:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
#include <boost/asio.hpp> #include <boost/asio/spawn.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> #include <iostream> using namespace std; using boost::asio::ip::tcp; class session: public boost::enable_shared_from_this<session> { public: explicit session(boost::asio::io_service &io_service) : socket_(io_service), timer_(io_service), strand_(io_service) { } tcp::socket &socket() { return socket_; } void go() { boost::asio::spawn(strand_, boost::bind(&session::echo, shared_from_this(), _1)); boost::asio::spawn(strand_, boost::bind(&session::timeout, shared_from_this(), _1)); } private: void echo(boost::asio::yield_context yield) { try { char data[128]; while (true) { timer_.expires_from_now(boost::posix_time::seconds(10)); size_t n = socket_.async_read_some(boost::asio::buffer(data), yield); boost::asio::async_write(socket_, boost::asio::buffer(data, n), yield); } } catch (exception &) { socket_.close(); timer_.cancel(); } } void timeout(boost::asio::yield_context yield) { while (socket_.is_open()) { boost::system::error_code ignored_ec; timer_.async_wait(yield[ignored_ec]); if (timer_.expires_from_now() <= boost::posix_time::seconds(0)) { socket_.close(); } } } tcp::socket socket_; boost::asio::deadline_timer timer_; boost::asio::io_service::strand strand_; }; void do_accept(boost::asio::io_service &io_service, unsigned short port, boost::asio::yield_context yield) { tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port)); while (true) { boost::system::error_code ec; boost::shared_ptr<session> new_session(new session(io_service)); acceptor.async_accept(new_session->socket(), yield[ec]); if (!ec) { new_session->go(); } } } int main() { try { boost::asio::io_service io_service; boost::asio::spawn(io_service, boost::bind(do_accept, boost::ref(io_service), 2222, _1)); io_service.run(); } catch (exception &e) { cerr << "Exception: " << e.what() << endl; } return 0; } |
The Python in my previous article can be used to work with the code above. I also tried to write a TCP server with only boost::coroutines
classes. select()
is used, since I want the code to be platform independent. NOTE: with coroutines, we have only _one_ thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
#ifdef _WIN32 #include <winsock2.h> #include <ws2tcpip.h> #include <windows.h> #pragma comment(lib, "ws2_32.lib") #pragma warning(disable: 4996) #define sock_send(s, str, len) send(s, str, len, 0) #define sock_close(s) closesocket(s) #else #include <arpa/inet.h> #include <sys/types.h> #include <sys/socket.h> #define sock_send(s, str, len) send(s, str, len, MSG_NOSIGNAL) #define sock_close(s) close(s) #endif #include <cerrno> #include <cstdio> #include <cstring> #include <iostream> #include <list> #include <boost/bind.hpp> #include <boost/coroutine/all.hpp> #include <boost/shared_ptr.hpp> using namespace std; #ifdef _WIN32 struct Win32SocketWrapper { Win32SocketWrapper() { WSADATA wsaData; WSAStartup(0x0202, &wsaData); } ~Win32SocketWrapper() { WSACleanup(); } } g_win32_socket_wrapper; #endif class session { typedef boost::coroutines::symmetric_coroutine<void> coro_t; public: explicit session(int sock) : socket_(sock) { echo_coro_ = coro_t::call_type(boost::bind(&session::echo, this, _1)); } int socket() { return socket_; } void go() { echo_coro_(); } void echo(coro_t::yield_type &yield) { int rc; char buffer[128]; while (true) { memset(buffer, 0, sizeof(buffer)); yield(); rc = recv(socket_, buffer, sizeof(buffer), 0); if (rc == 0 || rc == -1) { /* close or error */ printf("socket[%d] closed, rc=%d..\n", socket_, rc); sock_close(socket_); socket_ = -1; /* do not release here, or the whole coroutine context will be invalid.. */ break; } else { sock_send(socket_, buffer, rc); } } } private: int socket_; coro_t::call_type echo_coro_; }; void event_loop(int server_sock) { list<boost::shared_ptr<session> > session_list; int rc, maxfd, client_sock; fd_set rdset; struct sockaddr_in client_addr; size_t addr_size = sizeof(struct sockaddr_in); while (true) { FD_ZERO(&rdset); FD_SET(server_sock, &rdset); maxfd = server_sock; list<boost::shared_ptr<session> >::iterator it = session_list.begin(); while (it != session_list.end()) { if ((*it)->socket() == -1) { session_list.erase(it++); } else { FD_SET((*it)->socket(), &rdset); if (maxfd < (*it)->socket()) { maxfd = (*it)->socket(); } ++it; } } /* max fd value plus 1 */ rc = select(maxfd+1, &rdset, 0, 0, NULL); if (rc == -1) { continue; } else { if (FD_ISSET(server_sock, &rdset)) { client_sock = (int)accept(server_sock, (struct sockaddr *)&client_addr, (socklen_t *)&addr_size); printf("socket[%d] accepted: %s:%d..\n", client_sock, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); boost::shared_ptr<session> new_session(new session(client_sock)); new_session->go(); /* go first */ session_list.push_back(new_session); } for (list<boost::shared_ptr<session> >::iterator it = session_list.begin(); it != session_list.end(); ++it) { if (FD_ISSET((*it)->socket(), &rdset)) { (*it)->go(); } } } } } int main() { int rc, server_sock; struct sockaddr_in server_addr; server_sock = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(2222); rc = bind(server_sock, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)); if (rc < 0) { fprintf(stderr, "bind: %s.\n", strerror(errno)); return -1; } listen(server_sock, 5); /* loop */ event_loop(server_sock); sock_close(server_sock); return 0; } |