diff options
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | Makefile | 22 | ||||
m--------- | boost-asio-fastbuffer | 0 | ||||
-rw-r--r-- | main.cpp | 11 | ||||
-rw-r--r-- | socks5.cpp | 699 | ||||
-rw-r--r-- | socks5.hpp | 253 |
6 files changed, 724 insertions, 264 deletions
diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..9980dbe --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "boost-asio-fastbuffer"] + path = boost-asio-fastbuffer + url = https://github.com/utoni/boost-asio-fastbuffer.git @@ -1,4 +1,5 @@ CXX = g++ +GIT = git CXXFLAGS = -Wall -Wextra SERVER_HDRS = socks5.hpp SERVER_SRCS = socks5.cpp main.cpp @@ -11,15 +12,28 @@ CXXFLAGS += -fsanitize=thread -fsanitize=undefined endif endif ifneq ($(DEBUG),) -CXXFLAGS += -g #-DBOOST_ASIO_ENABLE_HANDLER_TRACKING=1 +CXXFLAGS += -g3 #-DBOOST_ASIO_ENABLE_HANDLER_TRACKING=1 +else +CXXFLAGS += -O3 -fomit-frame-pointer -flto +endif +ifneq ($(HANDLER_TRACKING),) +CXXFLAGS += -DBOOST_ASIO_ENABLE_HANDLER_TRACKING=1 endif -all: server +all: git server + +git: boost-asio-fastbuffer/fastbuffer.hpp -server: $(SERVER_HDRS) $(SERVER_SRCS) +boost-asio-fastbuffer/fastbuffer.hpp: + $(GIT) submodule update --init + +server: boost-asio-fastbuffer/fastbuffer.hpp $(SERVER_HDRS) $(SERVER_SRCS) $(CXX) $(CXXFLAGS) $(SERVER_SRCS) -o $@ clean: rm -f server -.PHONY: clean +distclean: clean + $(GIT) submodule deinit --all --force + +.PHONY: all git clean distclean diff --git a/boost-asio-fastbuffer b/boost-asio-fastbuffer new file mode 160000 +Subproject 06a06a622d2d148e3258a815dade04b86d90093 @@ -1,14 +1,25 @@ #include "socks5.hpp" #include <boost/asio/io_context.hpp> +#include <iostream> #include <thread> int main() { + std::cout << "SOCKS5::ProxyServer listen on 127.0.0.1:1080\n" + << "SOCKS5::LoggingProxyServer listen on 127.0.0.1:1081\n" + << "SOCKS5::CustomProtocolProxyServer listen on 127.0.0.1:1082\n"; + boost::asio::io_context ioc; auto server = SOCKS5::ProxyServer(ioc, "127.0.0.1", 1080); + auto logging_server = SOCKS5::LoggingProxyServer(ioc, "127.0.0.1", 1081); + auto custom_protocol_server = + SOCKS5::CustomProtocolProxyServer(ioc, "127.0.0.1", 1082); auto threads = std::vector<std::thread>(); server.start(); + logging_server.start(); + custom_protocol_server.start(); + for (size_t i = 0; i < 4; ++i) { threads.emplace_back([&ioc]() { ioc.run(); }); } @@ -13,43 +13,94 @@ #include <boost/system/detail/error_code.hpp> #include <cstddef> #include <cstdint> +#include <iostream> #include <memory> +#include <mutex> #include <string> using namespace SOCKS5; +using namespace boost; +using namespace boost::asio; using boost::asio::io_context; using boost::asio::ip::tcp; +using namespace boost::system; -ProxySessionBase::ProxySessionBase(std::uint32_t session_id, - tcp::socket &&client_socket, - std::size_t buffer_size) +static std::mutex g_loggingMutex; + +template <typename Executor> +void AsyncDestinationSocket<Executor>::do_connect_tcp( + boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + std::function<void(system::error_code)> handler) { + m_socket.emplace(tcp::socket(m_strand)); + auto &tcp_socket = boost::get<tcp::socket>(*m_socket); + tcp_socket.async_connect(it->endpoint(), std::move(handler)); +} + +template <typename Executor> +bool AsyncDestinationSocket<Executor>::do_read( + BufferBase &buffer, + std::function<void(system::error_code, std::size_t)> handler) { + if (m_socket) { + if (auto *s = boost::get<tcp::socket>(&*m_socket)) { + s->async_read_some(+buffer, std::move(handler)); + return true; + } + } + return false; +} + +template <typename Executor> +bool AsyncDestinationSocket<Executor>::do_write( + BufferBase &buffer, std::size_t length, + std::function<void(system::error_code, std::size_t)> handler) { + if (m_socket) { + if (auto *s = boost::get<tcp::socket>(&*m_socket)) { + async_write(*s, -buffer, transfer_exactly(length), std::move(handler)); + return true; + } + } + return false; +} + +template <typename Executor> bool AsyncDestinationSocket<Executor>::cancel() { + if (m_socket) { + if (auto *s = boost::get<tcp::socket>(&*m_socket)) { + s->cancel(); + return true; + } + } + return false; +} + +ProxyBase::ProxyBase(std::uint32_t session_id, tcp::socket &&client_socket, + std::size_t buffer_size) : m_sessionId{session_id}, m_inBuf{buffer_size}, m_outBuf{buffer_size}, m_clientSocket{std::move(client_socket)} {} -ProxySessionBase::ProxySessionBase(std::uint32_t session_id, - boost::asio::ip::tcp::socket &&client_socket, - StreamBuffer &&input_buffer, - StreamBuffer &&output_buffer) +ProxyBase::ProxyBase(std::uint32_t session_id, tcp::socket &&client_socket, + ContiguousStreamBuffer &&input_buffer, + ContiguousStreamBuffer &&output_buffer) : m_sessionId{session_id}, m_inBuf{std::move(input_buffer)}, m_outBuf{std::move(output_buffer)}, m_clientSocket{std::move(client_socket)} {} -ProxySessionAuth::ProxySessionAuth(std::uint32_t session_id, - tcp::socket &&client_socket) - : ProxySessionBase(session_id, std::move(client_socket), 32), - m_resolver{m_clientSocket.get_executor()}, - m_destinationSocket{m_clientSocket.get_executor()} {} - -void ProxySessionAuth::start() { - boost::asio::async_read( - m_clientSocket, +m_inBuf, boost::asio::transfer_exactly(2), - boost::bind(&ProxySessionAuth::recv_client_greeting, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); +ProxyAuth::ProxyAuth(std::uint32_t session_id, tcp::socket &&client_socket) + : ProxyBase(session_id, std::move(client_socket), 512), + session_buffer_size{0} {} + +void ProxyAuth::start_internal() { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + + async_read(m_clientSocket, +m_inBuf, transfer_at_least(2), + boost::bind(&ProxyAuth::recv_client_greeting, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionAuth::recv_client_greeting(const boost::system::error_code &ec, - std::size_t length) { +void ProxyAuth::recv_client_greeting(const system::error_code &ec, + std::size_t length) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (ec || length == 0) return; m_inBuf += length; @@ -58,15 +109,15 @@ void ProxySessionAuth::recv_client_greeting(const boost::system::error_code &ec, return; const std::size_t expected_size = std::size_t(2) + m_inBuf[1]; if (m_inBuf.size() < expected_size) { - boost::asio::async_read( - m_clientSocket, +m_inBuf, boost::asio::transfer_exactly(expected_size - m_inBuf.size()), - boost::bind(&ProxySessionAuth::recv_client_greeting, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + async_read(m_clientSocket, +m_inBuf, + transfer_at_least(expected_size - m_inBuf.size()), + boost::bind(&ProxyAuth::recv_client_greeting, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); return; } - auto found_no_auth = false; // only No Authentication supported + auto found_no_auth = false; // only "No Authentication" supported for (auto i = std::size_t(2); i < expected_size; ++i) { if (m_inBuf[i] == 0x00) { found_no_auth = true; @@ -78,42 +129,44 @@ void ProxySessionAuth::recv_client_greeting(const boost::system::error_code &ec, send_server_greeting(found_no_auth); } -void ProxySessionAuth::send_server_greeting(bool auth_supported) { +void ProxyAuth::send_server_greeting(bool auth_supported) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (!auth_supported) { m_outBuf += {0x05, 0xFF}; m_clientSocket.async_send( - -m_outBuf, - boost::bind(&ProxySessionAuth::handle_write, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + -m_outBuf, boost::bind(&ProxyAuth::handle_write, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); return; } m_outBuf += {0x05, 0x00}; m_clientSocket.async_send( - -m_outBuf, - boost::bind(&ProxySessionAuth::handle_write, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + -m_outBuf, boost::bind(&ProxyAuth::handle_write, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); process_connection_request(); } -void ProxySessionAuth::recv_connection_request( - const boost::system::error_code &ec, std::size_t length) { +void ProxyAuth::recv_connection_request(const system::error_code &ec, + std::size_t length) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (ec || length == 0) return; m_inBuf += length; - process_connection_request(); } -void ProxySessionAuth::process_connection_request() { +void ProxyAuth::process_connection_request() { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (m_inBuf.size() < 6) { - boost::asio::async_read( - m_clientSocket, +m_inBuf, boost::asio::transfer_exactly(6 - m_inBuf.size()), - boost::bind(&ProxySessionAuth::recv_connection_request, - shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + async_read(m_clientSocket, +m_inBuf, transfer_at_least(6 - m_inBuf.size()), + boost::bind(&ProxyAuth::recv_connection_request, + shared_from_this(), asio::placeholders::error, + asio::placeholders::bytes_transferred)); return; } @@ -142,12 +195,11 @@ void ProxySessionAuth::process_connection_request() { expected_size += std::size_t(6) + address_size; if (m_inBuf.size() < expected_size) { - boost::asio::async_read( - m_clientSocket, +m_inBuf, - boost::asio::transfer_exactly(expected_size - m_inBuf.size()), - boost::bind(&ProxySessionAuth::recv_connection_request, - shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + async_read(m_clientSocket, +m_inBuf, + transfer_at_least(expected_size - m_inBuf.size()), + boost::bind(&ProxyAuth::recv_connection_request, + shared_from_this(), asio::placeholders::error, + asio::placeholders::bytes_transferred)); return; } @@ -155,221 +207,278 @@ void ProxySessionAuth::process_connection_request() { switch (m_inBuf[3]) { case 0x01: { auto ip4_bytes = ::ntohl(*reinterpret_cast<const uint32_t *>(m_inBuf(4))); - m_endpoint = tcp::endpoint(boost::asio::ip::make_address_v4(ip4_bytes), - ::ntohs(*reinterpret_cast<const uint16_t *>( - m_inBuf(4 + address_size)))); + auto host = ip::make_address_v4(ip4_bytes); + auto port = + ntohs(*reinterpret_cast<const uint16_t *>(m_inBuf(4 + address_size))); + tcp::endpoint direct_endpoint(std::move(host), port); + m_tcp_resolver_results = + tcp::resolver::results_type::create(std::move(direct_endpoint), "", ""); + m_tcp_resolver_iter = m_tcp_resolver_results.cbegin(); connect_to_destination(proxy_cmd); - return; + break; } case 0x03: { auto host = std::string_view(reinterpret_cast<const char *>(m_inBuf(5)), address_size); - auto port = - ::ntohs(*reinterpret_cast<const uint16_t *>(m_inBuf(5 + address_size))); - resolve_destination_host(proxy_cmd, host, port); - return; + auto port = ntohs( + (*reinterpret_cast<const uint8_t *>(m_inBuf(5 + address_size)) << 0) | + (*reinterpret_cast<const uint8_t *>(m_inBuf(6 + address_size)) << 8)); + tcp::endpoint direct_endpoint(ip::address_v4(), port); + m_tcp_resolver_results = tcp::resolver::results_type::create( + std::move(direct_endpoint), std::string(host), ""); + m_tcp_resolver_iter = m_tcp_resolver_results.cbegin(); + resolve_tcp_destination_host(proxy_cmd, host, port); + break; } case 0x04: { auto ip6_array = *reinterpret_cast<const std::array<std::uint8_t, 16> *>(m_inBuf(4)); - m_endpoint = tcp::endpoint(boost::asio::ip::make_address_v6(ip6_array), - ::ntohs(*reinterpret_cast<const uint16_t *>( - m_inBuf(4 + address_size)))); + auto host = ip::make_address_v6(ip6_array); + auto port = + ntohs(*reinterpret_cast<const uint16_t *>(m_inBuf(4 + address_size))); + tcp::endpoint direct_endpoint(std::move(host), port); + m_tcp_resolver_results = + tcp::resolver::results_type::create(std::move(direct_endpoint), "", ""); + m_tcp_resolver_iter = m_tcp_resolver_results.cbegin(); connect_to_destination(proxy_cmd); - return; + break; } default: - return; + break; } + + m_inBuf -= expected_size; } -void ProxySessionAuth::send_server_response(std::uint8_t proxy_cmd, - std::uint8_t status_code) { +void ProxyAuth::send_server_response(std::uint8_t proxy_cmd, + std::uint8_t status_code) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + m_outBuf += {0x05, status_code, 0x00}; - // TODO: Set DNS domain if available - if (m_endpoint.address().is_v4()) { - const uint32_t addr = ::htonl(m_endpoint.address().to_v4().to_uint()); + + auto tcp_endpoint = m_tcp_resolver_iter->endpoint(); + auto tcp_address = tcp_endpoint.address(); + if (m_tcp_resolver.has_value()) { + const auto tcp_hostname = m_tcp_resolver_iter->host_name(); + m_outBuf += {0x03, static_cast<uint8_t>(tcp_hostname.length())}; + m_outBuf += tcp_hostname; + } else if (tcp_address.is_v4()) { + const uint32_t addr = ::htonl(tcp_address.to_v4().to_uint()); m_outBuf += {0x01, static_cast<uint8_t>(addr & 0x000000FF), static_cast<uint8_t>((addr & 0x0000FF00) >> 8), static_cast<uint8_t>((addr & 0x00FF0000) >> 16), static_cast<uint8_t>((addr & 0xFF000000) >> 24)}; } else { m_outBuf += {0x04}; - const auto addr = m_endpoint.address().to_v6().to_bytes(); + const auto addr = tcp_address.to_v6().to_bytes(); for (const auto byte : addr) m_outBuf += {byte}; } - const auto port = ::htons(m_endpoint.port()); - m_outBuf += {static_cast<uint8_t>(port & 0x00FF), - static_cast<uint8_t>((port & 0xFF00) >> 8)}; - m_clientSocket.async_send( - -m_outBuf, - boost::bind(&ProxySessionAuth::handle_response_write, shared_from_this(), - proxy_cmd, status_code, boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + const auto port = htons(tcp_endpoint.port()); + m_outBuf += std::initializer_list<unsigned char>{ + static_cast<uint8_t>(port & 0x00FF), + static_cast<uint8_t>((port & 0xFF00) >> 8)}; + + m_clientSocket.async_send(-m_outBuf, + boost::bind(&ProxyAuth::handle_response_write, + shared_from_this(), proxy_cmd, + status_code, asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionAuth::resolve_destination_host(std::uint8_t proxy_cmd, - const std::string_view &host, - std::uint16_t port) { - m_resolver.async_resolve(host, std::to_string(port), - [this, self = shared_from_this(), - proxy_cmd](const boost::system::error_code &ec, - const tcp::resolver::iterator &it) { - if (ec) { - send_server_response(proxy_cmd, 0x04); - return; - } - /* TODO: Support iterating and connecting to - * multiple resolved hosts on failure. */ - m_endpoint = *it; - connect_to_destination(proxy_cmd); - }); +void ProxyAuth::resolve_tcp_destination_host(std::uint8_t proxy_cmd, + const std::string_view &host, + std::uint16_t port) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + + m_tcp_resolver.emplace(m_clientSocket.get_executor()); + m_tcp_resolver->async_resolve( + host, std::to_string(port), + [this, self = shared_from_this(), + proxy_cmd](const system::error_code &ec, + boost::asio::ip::tcp::resolver::results_type res) { + if (ec) { + send_server_response(proxy_cmd, 0x04); + return; + } + m_tcp_resolver_results = std::move(res); + m_tcp_resolver_iter = m_tcp_resolver_results.cbegin(); + connect_to_destination(proxy_cmd); + }); } -void ProxySessionAuth::connect_to_destination(std::uint8_t proxy_cmd) { +void ProxyAuth::connect_to_destination(std::uint8_t proxy_cmd) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + + const auto check_error = [this, proxy_cmd](const system::error_code &ec) { + if (ec) { + auto tmp_iter = m_tcp_resolver_iter; + if (++tmp_iter != m_tcp_resolver_results.cend()) { + m_tcp_resolver_iter = tmp_iter; + return connect_to_destination(proxy_cmd); + } + + if (ec == system::errc::connection_refused) + send_server_response(proxy_cmd, 0x05); + else if (ec == system::errc::network_unreachable) + send_server_response(proxy_cmd, 0x03); + else if (ec == system::errc::host_unreachable) + send_server_response(proxy_cmd, 0x04); + else + send_server_response(proxy_cmd, 0x01); + return; + } + send_server_response(proxy_cmd, 0x00); + }; + + if (!m_destinationSocket) { + m_destinationSocket = m_getDestinationSocket(m_clientSocket.get_executor()); + if (!m_destinationSocket) + return send_server_response(proxy_cmd, 0x01); + } + switch (proxy_cmd) { case 0x01: // TCP client connection { - m_destinationSocket.async_connect( - m_endpoint, - [this, self = shared_from_this(), proxy_cmd](const boost::system::error_code &ec) { - if (ec) { - send_server_response(proxy_cmd, 0x04); - return; - } - send_server_response(proxy_cmd, 0x00); + + m_destinationSocket->connect_tcp( + m_tcp_resolver_iter, + [self = shared_from_this(), check_error](const system::error_code &ec) { + check_error(ec); }); return; } - case 0x02: // TCP port bind + case 0x02: // TCP port bind (not implemented) { - send_server_response(proxy_cmd, 0x02); - return; + return send_server_response(proxy_cmd, 0x07); } - case 0x03: // UDP port bind + case 0x03: // UDP port bind (not implemented) { - send_server_response(proxy_cmd, 0x02); - return; + return send_server_response(proxy_cmd, 0x07); } default: - send_server_response(proxy_cmd, 0x01); - return; + return send_server_response(proxy_cmd, 0x07); } } -void ProxySessionAuth::handle_write(const boost::system::error_code &ec, - std::size_t length) { +void ProxyAuth::handle_write(const system::error_code &ec, std::size_t length) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (ec || length == 0) m_clientSocket.cancel(); m_outBuf -= length; if (m_outBuf.size() > 0) m_clientSocket.async_send( - -m_outBuf, - boost::bind(&ProxySessionAuth::handle_write, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + -m_outBuf, boost::bind(&ProxyAuth::handle_write, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionAuth::handle_response_write( - std::uint8_t proxy_cmd, std::uint8_t status_code, - const boost::system::error_code &ec, std::size_t length) { +void ProxyAuth::handle_response_write(std::uint8_t proxy_cmd, + std::uint8_t status_code, + const system::error_code &ec, + std::size_t length) { + BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); + if (ec || length == 0) m_clientSocket.cancel(); m_outBuf -= length; if (m_outBuf.size() > 0) { m_clientSocket.async_send( - -m_outBuf, boost::bind(&ProxySessionAuth::handle_response_write, - shared_from_this(), proxy_cmd, status_code, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + -m_outBuf, + boost::bind(&ProxyAuth::handle_response_write, shared_from_this(), + proxy_cmd, status_code, asio::placeholders::error, + asio::placeholders::bytes_transferred)); return; } - if (status_code == 0x00) - switch (proxy_cmd) { - case 0x01: // TCP client connection - { - auto tcp_session = std::make_shared<ProxySessionTcp>( + if (status_code == 0x00) { + std::shared_ptr<ProxySession> session = nullptr; + if (session_buffer_size) + session = std::make_shared<ProxySession>( m_sessionId, std::move(m_clientSocket), - std::move(m_destinationSocket)); - tcp_session->start(); - break; - } - case 0x02: // TCP port bind - { - return; - } - case 0x03: // UDP port bind - { - return; - } - default: + std::move(m_destinationSocket), session_buffer_size); + else + session = std::make_shared<ProxySession>( + m_sessionId, std::move(m_clientSocket), + std::move(m_destinationSocket), std::move(m_inBuf), + std::move(m_outBuf)); + if (!session) { + m_clientSocket.cancel(); return; } + session->start(); + } } -ProxySessionTcp::ProxySessionTcp( - std::uint32_t session_id, boost::asio::ip::tcp::socket &&client_socket, - boost::asio::ip::tcp::socket &&destination_socket, std::size_t buffer_size) - : ProxySessionBase(session_id, std::move(client_socket), buffer_size), - m_destinationSocket(std::move(destination_socket)) {} - -void ProxySessionTcp::start() { recv_from_both(); } - -void ProxySessionTcp::recv_from_both() { +ProxySession::ProxySession(std::uint32_t session_id, + tcp::socket &&client_socket, + std::shared_ptr<DestinationSocketBase> &&dest_socket, + std::size_t buffer_size) + : ProxyBase(session_id, std::move(client_socket), buffer_size), + m_destinationSocket(std::move(dest_socket)) {} + +ProxySession::ProxySession(std::uint32_t session_id, + tcp::socket &&client_socket, + std::shared_ptr<DestinationSocketBase> &&dest_socket, + ContiguousStreamBuffer &&input_buffer, + ContiguousStreamBuffer &&output_buffer) + : ProxyBase(session_id, std::move(client_socket), std::move(input_buffer), + std::move(output_buffer)), + m_destinationSocket(std::move(dest_socket)) {} + +void ProxySession::start() { recv_from_both(); } + +void ProxySession::recv_from_both() { BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); m_clientSocket.async_read_some( - +m_inBuf, - boost::bind(&ProxySessionTcp::recv_from_client, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - m_destinationSocket.async_read_some( - +m_outBuf, - boost::bind(&ProxySessionTcp::recv_from_destination, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + +m_inBuf, boost::bind(&ProxySession::recv_from_client, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); + m_destinationSocket->read( + m_outBuf, boost::bind(&ProxySession::recv_from_destination, + shared_from_this(), asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionTcp::recv_from_destination(const boost::system::error_code &ec, - std::size_t length) { +void ProxySession::recv_from_destination(const system::error_code &ec, + std::size_t length) { BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); if (ec || length == 0) { - m_destinationSocket.cancel(); + m_destinationSocket->cancel(); return; } m_outBuf += length; - boost::asio::async_write( - m_clientSocket, -m_outBuf, boost::asio::transfer_all(), - boost::bind(&ProxySessionTcp::handle_client_write, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + async_write(m_clientSocket, -m_outBuf, transfer_all(), + boost::bind(&ProxySession::handle_client_write, + shared_from_this(), asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionTcp::recv_from_client(const boost::system::error_code &ec, - std::size_t length) { +void ProxySession::recv_from_client(const system::error_code &ec, + std::size_t length) { BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); if (ec || length == 0) { - m_destinationSocket.cancel(); + m_destinationSocket->cancel(); return; } m_inBuf += length; - boost::asio::async_write( - m_destinationSocket, -m_inBuf, boost::asio::transfer_exactly(length), - boost::bind(&ProxySessionTcp::handle_destination_write, - shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + m_destinationSocket->write( + m_inBuf, length, + boost::bind(&ProxySession::handle_destination_write, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionTcp::handle_client_write(const boost::system::error_code &ec, - std::size_t length) { +void ProxySession::handle_client_write(const system::error_code &ec, + std::size_t length) { BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); if (ec || length == 0) { @@ -378,28 +487,26 @@ void ProxySessionTcp::handle_client_write(const boost::system::error_code &ec, } m_outBuf -= length; - m_destinationSocket.async_read_some( - +m_outBuf, - boost::bind(&ProxySessionTcp::recv_from_destination, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + m_destinationSocket->read( + m_outBuf, boost::bind(&ProxySession::recv_from_destination, + shared_from_this(), asio::placeholders::error, + asio::placeholders::bytes_transferred)); } -void ProxySessionTcp::handle_destination_write( - const boost::system::error_code &ec, std::size_t length) { +void ProxySession::handle_destination_write(const system::error_code &ec, + std::size_t length) { BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__)); if (ec || length == 0) { - m_destinationSocket.cancel(); + m_destinationSocket->cancel(); return; } m_inBuf -= length; m_clientSocket.async_read_some( - +m_inBuf, - boost::bind(&ProxySessionTcp::recv_from_client, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + +m_inBuf, boost::bind(&ProxySession::recv_from_client, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred)); } ProxyServer::ProxyServer(io_context &ioc, const tcp::endpoint &local_endpoint) @@ -407,21 +514,219 @@ ProxyServer::ProxyServer(io_context &ioc, const tcp::endpoint &local_endpoint) ProxyServer::ProxyServer(io_context &ioc, const std::string &listen_addr, std::uint16_t listen_port) - : ProxyServer(ioc, tcp::endpoint(boost::asio::ip::make_address(listen_addr), - listen_port)) {} + : ProxyServer(ioc, + tcp::endpoint(ip::make_address(listen_addr), listen_port)) {} void ProxyServer::start() { async_accept(); } +void ProxyServer::stop() { m_acceptor.cancel(); } + void ProxyServer::async_accept() { m_acceptor.async_accept( - boost::asio::make_strand(m_acceptor.get_executor()), - [this](const boost::system::error_code &ec, tcp::socket client_socket) { + make_strand(m_acceptor.get_executor()), + [this](const system::error_code &ec, tcp::socket client_socket) { + if (!ec) { + auto auth_session = std::make_shared<ProxyAuth>( + m_nextSessionId.fetch_add(1, std::memory_order_relaxed), + std::move(client_socket)); + + if (auth_session) { + auth_session->set_session_buffer_size(BUFSIZ); + auth_session->start([](any_io_executor exec) { + auto aptr = new AsyncDestinationSocket<any_io_executor>(exec); + return std::shared_ptr<DestinationSocketBase>(std::move(aptr)); + }); + } + } + async_accept(); + }); +} + +template <typename Executor> +void LoggingAsyncDestinationSocket<Executor>::do_connect_tcp( + boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + std::function<void(system::error_code)> handler) { + const auto endpoint = it->endpoint(); + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::do_connect_tcp(): " + << endpoint.address().to_string() << ":" << endpoint.port() + << "\n"; + } + AsyncDestinationSocket<Executor>::do_connect_tcp(it, handler); +} + +template <typename Executor> +bool LoggingAsyncDestinationSocket<Executor>::do_read( + BufferBase &buffer, + std::function<void(system::error_code, std::size_t)> handler) { + return AsyncDestinationSocket<Executor>::do_read( + buffer, [this, handler](system::error_code ec, std::size_t length) { + m_bytesRead.fetch_add(length, std::memory_order_relaxed); + handler(ec, length); + }); +} + +template <typename Executor> +bool LoggingAsyncDestinationSocket<Executor>::do_write( + BufferBase &buffer, std::size_t length, + std::function<void(system::error_code, std::size_t)> handler) { + return AsyncDestinationSocket<Executor>::do_write( + buffer, length, + [this, handler](system::error_code ec, std::size_t length) { + m_bytesWritten.fetch_add(length, std::memory_order_relaxed); + handler(ec, length); + }); +} + +LoggingProxyServer::LoggingProxyServer(io_context &ioc, + const std::string &listen_addr, + std::uint16_t listen_port) + : ProxyServer(ioc, listen_addr, listen_port), m_statusLogger(ioc), + m_bytesRead{0}, m_bytesWritten{0} {} + +void LoggingProxyServer::start() { + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::start()\n"; + } + ProxyServer::start(); + async_timer(); +} + +void LoggingProxyServer::stop() { + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::stop()\n"; + } + ProxyServer::stop(); + m_statusLogger.cancel(); +} + +void LoggingProxyServer::async_timer() { + m_statusLogger.expires_from_now(boost::posix_time::seconds(1)); + m_statusLogger.async_wait([this](const system::error_code &ec) { + if (ec) { + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::async_timer() ERROR: " << ec << "\n"; + } + return; + } + + std::size_t total_ds = 0; + for (const auto &weak_ds : m_weakDestinationSockets) { + auto shared_ds = weak_ds.lock(); + if (!shared_ds) + continue; + total_ds++; + m_bytesRead += shared_ds->get_bytes_read(); + m_bytesWritten += shared_ds->get_bytes_written(); + } + m_weakDestinationSockets.erase( + std::remove_if(m_weakDestinationSockets.begin(), + m_weakDestinationSockets.end(), + [](const std::weak_ptr< + LoggingAsyncDestinationSocket<any_io_executor>> &w) { + return w.expired(); + }), + m_weakDestinationSockets.end()); + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::async_timer(): served " + << m_nextSessionId.load(std::memory_order_relaxed) - 1 + << " sessions, " << total_ds << " active sessions, " + << m_bytesRead << " bytes read, " << m_bytesWritten + << " bytes written\n"; + } + async_timer(); + }); +} + +void LoggingProxyServer::async_accept() { + m_acceptor.async_accept( + make_strand(m_acceptor.get_executor()), + [this](const system::error_code &ec, tcp::socket client_socket) { if (!ec) { - auto auth_session = std::make_shared<ProxySessionAuth>( + auto const client_endpoint = client_socket.remote_endpoint(); + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::async_accept() ACCEPT: id " + << m_nextSessionId.load(std::memory_order_relaxed) + << " from " << client_endpoint.address().to_string() + << ":" << client_endpoint.port() << "\n"; + } + + auto auth_session = std::make_shared<ProxyAuth>( m_nextSessionId.fetch_add(1, std::memory_order_relaxed), std::move(client_socket)); - if (auth_session) - auth_session->start(); + + if (auth_session) { + auth_session->set_session_buffer_size(BUFSIZ); + auth_session->start([this](any_io_executor exec) { + auto shared_ptr = std::make_shared< + LoggingAsyncDestinationSocket<any_io_executor>>(exec); + std::weak_ptr<LoggingAsyncDestinationSocket<any_io_executor>> + weak_ptr(shared_ptr); + m_weakDestinationSockets.push_back(weak_ptr); + return shared_ptr; + }); + } + } else { + { + std::lock_guard log_mtx{g_loggingMutex}; + std::cout << "LoggingProxyServer::async_accept() ERROR: " << ec + << "\n"; + } + return; + } + async_accept(); + }); +} + +template <typename Executor> +bool CustomProtocolAsyncDestinationSocket<Executor>::do_read( + BufferBase &buffer, + std::function<void(system::error_code, std::size_t)> handler) { + buffer += {'C', 'U', 'S', 'T', 'O', 'M'}; + return AsyncDestinationSocket<Executor>::do_read(buffer, handler); +} + +template <typename Executor> +bool CustomProtocolAsyncDestinationSocket<Executor>::do_write( + BufferBase &buffer, std::size_t length, + std::function<void(system::error_code, std::size_t)> handler) { + buffer += {'P', 'R', 'O', 'T', 'O', 'C', 'O', 'L'}; + return AsyncDestinationSocket<Executor>::do_write(buffer, length + 8, + handler); +} + +CustomProtocolProxyServer::CustomProtocolProxyServer( + io_context &ioc, const std::string &listen_addr, std::uint16_t listen_port) + : ProxyServer(ioc, listen_addr, listen_port) {} + +void CustomProtocolProxyServer::start() { ProxyServer::start(); } + +void CustomProtocolProxyServer::stop() { ProxyServer::stop(); } + +void CustomProtocolProxyServer::async_accept() { + m_acceptor.async_accept( + make_strand(m_acceptor.get_executor()), + [this](const system::error_code &ec, tcp::socket client_socket) { + if (!ec) { + auto auth_session = std::make_shared<ProxyAuth>( + m_nextSessionId.fetch_add(1, std::memory_order_relaxed), + std::move(client_socket)); + + if (auth_session) { + auth_session->set_session_buffer_size(BUFSIZ); + auth_session->start([](any_io_executor exec) { + auto aptr = + new CustomProtocolAsyncDestinationSocket<any_io_executor>( + exec); + return std::shared_ptr<DestinationSocketBase>(std::move(aptr)); + }); + } } async_accept(); }); @@ -1,80 +1,112 @@ #include <atomic> #include <boost/asio.hpp> #include <boost/asio/buffer.hpp> +#include <boost/asio/deadline_timer.hpp> #include <boost/asio/io_context.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/core/noncopyable.hpp> +#include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/noncopyable.hpp> #include <boost/system/detail/error_code.hpp> #include <boost/system/error_code.hpp> +#include <boost/variant.hpp> #include <cstdint> #include <memory> #include <string_view> +#include "boost-asio-fastbuffer/fastbuffer.hpp" + namespace SOCKS5 { -class StreamBuffer : public boost::noncopyable { +class DestinationSocketBase : private boost::noncopyable { public: - explicit StreamBuffer(std::size_t size) - : m_bufferUsed{0}, m_bufferSize{size} { - m_buffer = new std::uint8_t[size]; - } - StreamBuffer(StreamBuffer &&moveable) { - m_bufferUsed = moveable.m_bufferUsed; - m_bufferSize = moveable.m_bufferSize; - m_buffer = moveable.m_buffer; - moveable.m_buffer = nullptr; - } - ~StreamBuffer() { delete[] m_buffer; } - void operator+=(std::size_t commit_size) { m_bufferUsed += commit_size; } - void operator+=(const std::initializer_list<uint8_t> &to_add) { - std::copy(to_add.begin(), to_add.end(), &m_buffer[m_bufferUsed]); - m_bufferUsed += to_add.size(); - } - void operator-=(std::size_t consume_size) { m_bufferUsed -= consume_size; } - auto operator+() { - return boost::asio::buffer(&m_buffer[m_bufferUsed], - m_bufferSize - m_bufferUsed); + virtual ~DestinationSocketBase() {} + + template <typename Callback> + void + connect_tcp(boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + Callback &&handler) { + do_connect_tcp(it, std::forward<Callback>(handler)); } - auto operator-() { return boost::asio::buffer(&m_buffer[0], m_bufferUsed); } - auto operator[](std::size_t index) const { return m_buffer[index]; } - const auto *operator()(std::size_t index = 0) const { - return &m_buffer[index]; + void tcp_bind() { throw std::runtime_error("TCP Bind not implemented"); } + void udp_bind() { throw std::runtime_error("UDP Bind not implemented"); } + template <typename Callback> + bool read(BufferBase &buffer, Callback &&handler) { + return do_read(buffer, std::forward<Callback>(handler)); } - auto size() const { return m_bufferUsed; } - auto getHealth() const { - return (static_cast<float>(m_bufferUsed) / - static_cast<float>(m_bufferSize)); + template <typename Callback> + bool write(BufferBase &buffer, std::size_t length, Callback &&handler) { + return do_write(buffer, length, std::forward<Callback>(handler)); } + virtual bool cancel() = 0; -private: - std::size_t m_bufferUsed; - std::size_t m_bufferSize; - std::uint8_t *m_buffer; +protected: + virtual void do_connect_tcp( + boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + std::function<void(boost::system::error_code)>) = 0; + virtual bool + do_read(BufferBase &, + std::function<void(boost::system::error_code, std::size_t)>) = 0; + virtual bool + do_write(BufferBase &, std::size_t, + std::function<void(boost::system::error_code, std::size_t)>) = 0; +}; + +template <typename Executor> +class AsyncDestinationSocket : public DestinationSocketBase { +public: + AsyncDestinationSocket(const Executor &exec) : m_strand(exec) {} + ~AsyncDestinationSocket() {} + +protected: + void do_connect_tcp( + boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + std::function<void(boost::system::error_code)> handler) override; + bool do_read(BufferBase &buffer, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; + bool do_write(BufferBase &buffer, std::size_t length, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; + bool cancel() override; + + boost::asio::strand<Executor> m_strand; + boost::optional<boost::variant<boost::asio::ip::tcp::socket // TCP connect + >> + m_socket; }; -class ProxySessionBase : public boost::noncopyable { +class ProxyBase : private boost::noncopyable { public: - ProxySessionBase(std::uint32_t session_id, - boost::asio::ip::tcp::socket &&client_socket, - std::size_t buffer_size = BUFSIZ); - ProxySessionBase(std::uint32_t session_id, - boost::asio::ip::tcp::socket &&client_socket, - StreamBuffer &&input_buffer, StreamBuffer &&output_buffer); + ProxyBase(std::uint32_t session_id, + boost::asio::ip::tcp::socket &&client_socket, + std::size_t buffer_size = BUFSIZ); + ProxyBase(std::uint32_t session_id, + boost::asio::ip::tcp::socket &&client_socket, + ContiguousStreamBuffer &&input_buffer, + ContiguousStreamBuffer &&output_buffer); +protected: std::uint32_t m_sessionId; - StreamBuffer m_inBuf; - StreamBuffer m_outBuf; + ContiguousStreamBuffer m_inBuf; + ContiguousStreamBuffer m_outBuf; boost::asio::ip::tcp::socket m_clientSocket; }; -class ProxySessionAuth : public ProxySessionBase, - public std::enable_shared_from_this<ProxySessionAuth> { +class ProxyAuth : private ProxyBase, + public std::enable_shared_from_this<ProxyAuth> { public: - ProxySessionAuth(std::uint32_t session_id, - boost::asio::ip::tcp::socket &&client_socket); - void start(); + ProxyAuth(std::uint32_t session_id, + boost::asio::ip::tcp::socket &&client_socket); + template <typename GetDsCallback> void start(GetDsCallback &&handler) { + m_getDestinationSocket = std::forward<GetDsCallback>(handler); + start_internal(); + } + void set_session_buffer_size(std::size_t buffer_size) { + session_buffer_size = buffer_size; + } private: + void start_internal(); void recv_client_greeting(const boost::system::error_code &ec, std::size_t length); void send_server_greeting(bool auth_supported); @@ -82,27 +114,38 @@ private: std::size_t length); void process_connection_request(); void send_server_response(std::uint8_t proxy_cmd, std::uint8_t status_code); - void resolve_destination_host(std::uint8_t proxy_cmd, - const std::string_view &host, - std::uint16_t port); + void resolve_tcp_destination_host(std::uint8_t proxy_cmd, + const std::string_view &host, + std::uint16_t port); void connect_to_destination(std::uint8_t proxy_cmd); void handle_write(const boost::system::error_code &ec, std::size_t length); void handle_response_write(std::uint8_t proxy_cmd, std::uint8_t status_code, const boost::system::error_code &ec, std::size_t length); - boost::asio::ip::tcp::resolver m_resolver; - boost::asio::ip::tcp::endpoint m_endpoint; - boost::asio::ip::tcp::socket m_destinationSocket; + std::size_t session_buffer_size; + std::function<std::shared_ptr<DestinationSocketBase>( + boost::asio::any_io_executor)> + m_getDestinationSocket; + std::shared_ptr<DestinationSocketBase> m_destinationSocket; + boost::optional<boost::asio::ip::tcp::resolver> m_tcp_resolver; + boost::asio::ip::tcp::resolver::results_type m_tcp_resolver_results; + boost::asio::ip::tcp::resolver::results_type::const_iterator + m_tcp_resolver_iter; }; -class ProxySessionTcp : public ProxySessionBase, - public std::enable_shared_from_this<ProxySessionTcp> { +class ProxySession : private ProxyBase, + public std::enable_shared_from_this<ProxySession> { public: - explicit ProxySessionTcp(std::uint32_t session_id, - boost::asio::ip::tcp::socket &&client_socket, - boost::asio::ip::tcp::socket &&destination_socket, - std::size_t buffer_size = 65535); + ProxySession(std::uint32_t session_id, + boost::asio::ip::tcp::socket &&client_socket, + std::shared_ptr<DestinationSocketBase> &&dest_socket, + std::size_t buffer_size); + ProxySession(std::uint32_t session_id, + boost::asio::ip::tcp::socket &&client_socket, + std::shared_ptr<DestinationSocketBase> &&dest_socket, + ContiguousStreamBuffer &&input_buffer, + ContiguousStreamBuffer &&output_buffer); void start(); private: @@ -116,21 +159,105 @@ private: void handle_destination_write(const boost::system::error_code &ec, std::size_t length); - boost::asio::ip::tcp::socket m_destinationSocket; + std::shared_ptr<DestinationSocketBase> m_destinationSocket; }; -class ProxyServer : public boost::noncopyable { +class ProxyServer : private boost::noncopyable { public: ProxyServer(boost::asio::io_context &ioc, const boost::asio::ip::tcp::endpoint &local_endpoint); ProxyServer(boost::asio::io_context &ioc, const std::string &listen_addr, std::uint16_t listen_port); void start(); + void stop(); -private: - void async_accept(); +protected: + virtual void async_accept(); std::atomic<uint32_t> m_nextSessionId; boost::asio::ip::tcp::acceptor m_acceptor; }; + +// End Of Minimal Implementation + +template <typename Executor> +class LoggingAsyncDestinationSocket : public AsyncDestinationSocket<Executor> { +public: + LoggingAsyncDestinationSocket(const Executor &exec) + : AsyncDestinationSocket<Executor>(exec), m_bytesRead{0}, + m_bytesWritten{0} {} + ~LoggingAsyncDestinationSocket() {} + std::size_t get_bytes_read() { + return m_bytesRead.exchange(0, std::memory_order_relaxed); + } + std::size_t get_bytes_written() { + return m_bytesWritten.exchange(0, std::memory_order_relaxed); + } + +private: + void do_connect_tcp( + boost::asio::ip::tcp::resolver::results_type::const_iterator &it, + std::function<void(boost::system::error_code)> handler) override; + bool do_read(BufferBase &buffer, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; + bool do_write(BufferBase &buffer, std::size_t length, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; + + std::atomic<std::size_t> m_bytesRead; + std::atomic<std::size_t> m_bytesWritten; +}; + +class LoggingProxyServer : public ProxyServer { +public: + LoggingProxyServer(boost::asio::io_context &ioc, + const std::string &listen_addr, std::uint16_t listen_port); + void start(); + void stop(); + +private: + void async_timer(); + void async_accept() override; + + std::vector<std::weak_ptr< + LoggingAsyncDestinationSocket<boost::asio::any_io_executor>>> + m_weakDestinationSockets; + boost::asio::deadline_timer m_statusLogger; + std::atomic<std::size_t> m_bytesRead; + std::atomic<std::size_t> m_bytesWritten; +}; + +// The following is just an example if you want to use a custom proxy/tunnel +// protocol ;) + +template <typename Executor> +class CustomProtocolAsyncDestinationSocket + : public AsyncDestinationSocket<Executor> { +public: + CustomProtocolAsyncDestinationSocket(const Executor &exec) + : AsyncDestinationSocket<Executor>(exec) {} + ~CustomProtocolAsyncDestinationSocket() {} + +private: + bool do_read(BufferBase &buffer, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; + bool do_write(BufferBase &buffer, std::size_t length, + std::function<void(boost::system::error_code, std::size_t)> + handler) override; +}; + +class CustomProtocolProxyServer : public ProxyServer { +public: + CustomProtocolProxyServer(boost::asio::io_context &ioc, + const std::string &listen_addr, + std::uint16_t listen_port); + void start(); + void stop(); + +private: + void async_accept() override; +}; + }; // namespace SOCKS5 |