2019独角兽企业重金招聘Python工程师标准>>>
ServerBootstrap介绍
ServerBootstrap,顾名思义,它是作为Wangle服务端的一个启动辅助类,熟悉Netty的朋友都知道builder模式,它的唯一目的就是以方便的、统一的方式启动一个Server。
示例
下面以官方提供了一个Echo demo为例,先宏观了解一下ServerBootstrap的用法,代码如下:
typedef Pipeline<IOBufQueue&, std::string> EchoPipeline;
class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {
public:
EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
auto pipeline = EchoPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(LineBasedFrameDecoder(8192));
pipeline->addBack(StringCodec());
pipeline->addBack(EchoHandler());
pipeline->finalize();
return pipeline;
}
};
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
ServerBootstrap<EchoPipeline> server;
server.childPipeline(std::make_shared<EchoPipelineFactory>());
server.bind(FLAGS_port);
server.waitForStop();
return 0;
}
乍一看,是不是觉得和Netty很像,的确,Wangle在设计思想上很大部分都是借鉴了Netty。在定义ServerBootstrap时需要指定一个模板参数,这个参数为新连接对应的Pipleline的类型,上例中是一个Read类型为IOBufQueue&、Write类型为std::string的Pipeline。有了Pipeline类型还不够,还需要一个Pipeline类型工厂专门用于创建该类型的Pipeline(上例中为EchoPipelineFactory),使用ServerBootstrap的childPipeline方法设置。设置完成之后就可以调用bind进行端口监听了。其实ServerBootstrap上还可以设置很多选项,比如使用group方法设置acceptor线程池和io线程池等。
bind过程源码分析
bind有多个重载版本,本文以直接绑定一个本地端口为例(这是最常用的用法),源码为:
void bind(int port) {
CHECK(port >= 0);
folly::SocketAddress address;
// 设置本地地址
address.setFromLocalPort(port);
bindImpl(address);
}
bind只是使用setFromLocalPort初始化了一个folly::SocketAddress,然后直接调用bindImpl,继续看bindImpl实现:
void bindImpl(folly::SocketAddress &address) {
// 之前没有手动设置group
if (!workerFactory_) {
group(nullptr);
}
// 如果accept线程数大于1,那么就在所有的accept线程中重用端口进行监听
bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);
std::mutex sock_lock;
std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
std::exception_ptr exn;
// 定义一个lamda表达式,执行ServerSocket创建和accept操作,该函数一定会在accept线程中执行
auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier) {
try {
// 创建服务端监听socket
// 此函数不会阻塞
// AsyncServerSocketFactory
auto socket = socketFactory_->newSocket(address, socketConfig.acceptBacklog, reusePort,
socketConfig);
sock_lock.lock();
new_sockets.push_back(socket);
sock_lock.unlock();
// 获取socket绑定的本地地址
socket->getAddress(&address);
// 唤醒
barrier->post();
} catch (...) {
// 先把异常记录下来
exn = std::current_exception();
barrier->post();
return;
}
};
auto wait0 = std::make_shared<folly::Baton<>>();
// 在acceptor_group_线程池中添加并执行startupFunc任务(异步)
acceptor_group_->add(std::bind(startupFunc, wait0));
wait0->wait();//等待
// 从1开始,在剩下的acceptor线程中启动监听
for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
auto barrier = std::make_shared<folly::Baton<>>();
acceptor_group_->add(std::bind(startupFunc, barrier));
barrier->wait();
}
// 如果前面有异常
if (exn) {
// 异常重新抛出
std::rethrow_exception(exn);
}
// 遍历new_sockets(所有新创建的listening中的socket)
for (auto &socket : new_sockets) {
// Startup all the threads
workerFactory_->forEachWorker([this, socket](Acceptor *worker) {
// 在工作线程中
socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[this, worker, socket]() {
// 异步的添加accept回调worker
socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
});
// 缓存所有处于listening状态的socket
sockets_->push_back(socket);
}
}
首先,检查workerFactory_是否为空,如果为空,说明之前没有手动调用group设置过acceptor、io线程池,那么此时会先调用group(nullptr)进行线程池的默认设置,group函数源码如下:
ServerBootstrap *group(std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
return group(nullptr, io_group);
}
进一步调用group的重载版本:
ServerBootstrap *group(
std::shared_ptr<wangle::IOThreadPoolExecutor> accept_group, // acceptor线程
std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) { // io线程
// 如果没有设置accept线程
if (!accept_group) {
// 就创建一个只有一个线程的线程池负责accept
accept_group = std::make_shared<wangle::IOThreadPoolExecutor>(
1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
}
// 如果没有设置IO线程池
if (!io_group) {
auto threads = std::thread::hardware_concurrency();// 返回CPU核数
if (threads <= 0) {
// Reasonable mid-point for concurrency when actual value unknown
threads = 8;
}
// 创建IO线程,线程数为CPU核数(这一步会真正的创建threads个线程)
io_group = std::make_shared<wangle::IOThreadPoolExecutor>(
threads, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
}
// TODO better config checking
// CHECK(acceptorFactory_ || childPipelineFactory_);
CHECK(!(acceptorFactory_ && childPipelineFactory_));
// 如果自己提供了定制的ServerWorkerPool
if (acceptorFactory_) {
workerFactory_ = std::make_shared<ServerWorkerPool>(
acceptorFactory_,
io_group.get(),
sockets_,
socketFactory_);
} else {
// 否则就是用默认的
workerFactory_ = std::make_shared<ServerWorkerPool>(
/* ServerAcceptorFactory用于创建一个ServerAcceptor,这个ServerAcceptor<Pipeline>
* 负责新建acceptPipeline_,并且它自己还是一个wangle::InboundHandler
* 将自己添加到ServerAcceptor,负责对新的连接的创建和管理
* 注意在不设置时,acceptPipelineFactory_的值默认无DefaultAcceptPipelineFactory(只是单纯的创建了一个空白Pipeline)
* */
std::make_shared<ServerAcceptorFactory<Pipeline>>(acceptPipelineFactory_, childPipelineFactory_,
accConfig_),
io_group.get(),
sockets_,// listening中的sockets,在bind调用之前这里的sockets_为空
socketFactory_);
}
// 为IO线程池添加观察者!这一步会出发调用每一个线程的threadPreviouslyStarted方法
// workerFactory_是一个ThreadPoolExecutor::Observer
io_group->addObserver(workerFactory_);
acceptor_group_ = accept_group;
io_group_ = io_group;
return this;
}
该group函数有两个参数,分别为aceeptor线程池和io线程池,如果两个参数都为nullptr,那么acceptor线程数默认为1,而io线程数为cpu硬件核数,否则就使用参数提供的值,线程池设置完毕之后,会创建workerFactory_。workerFactory_实际上是一个ServerWorkerPool,而ServerWorkerPool实际上是一个线程池的观察者(实现了观察者Observer接口的threadStarted、threadStopped、threadPreviouslyStarted、threadNotYetStoppe方法),ServerWorkerPool构造方法需要四个参数,分别为:创建Acceptor的工厂AcceptorFactory、IO线程池、处于listening中的socket集合、以及用于创建ServerSocket的工厂ServerSocketFactory。其中AcceptorFactory和ServerSocketFactory分别可以通过ServerBootstrap的childHandler方法和channelFactory手动设置,没有设置时分别默认为ServerAcceptorFactory和AsyncServerSocketFactory(大多数情况下都是使用默认值)。在默认情况下,创建ServerAcceptorFactory时,需要显示提供3个参数,他们分别为:AcceptPipelineFactory、childPipelineFactory以及ServerSocketConfig。其中AcceptPipelineFactory的值可以使用ServerBootstrap的pipeline进行设置,如果没有手动设置,默认为DefaultAcceptPipelineFactory(大多数情况都是如此),DefaultAcceptPipelineFactory只是创建一个空白的Pipeline(空白指的是没有添加任何Handler)。childPipelineFactory通过ServerBootstrap的childPipeline方法设置,它主要用来为每个新到来的连接创建Pipeline。上文提到,workerFactory_本质是一个线程池观察者(Observer),那么它是用来观察谁呢?从代码“io_group->addObserver(workerFactory_)”可以看到,它是用来观察io_group(IO线程池),addObserver代码如下:
void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
RWSpinLock::ReadHolder r{&threadListLock_};
observers_.push_back(o);
// 遍历线程列表,分别为每个线程调用观察者的threadPreviouslyStarted
for (auto& thread : threadList_.get()) {
o->threadPreviouslyStarted(thread.get());
}
}
首先是遍历线程池中的每一个线程,然后为每个线程调用观察者的threadPreviouslyStarted方法,threadPreviouslyStarted方法代码如下:
virtual void threadPreviouslyStarted(ThreadHandle* h) {
threadStarted(h);
}
进而调用threadStarted:
void ServerWorkerPool::threadStarted(wangle::ThreadPoolExecutor::ThreadHandle *h) {
// 创建一个ServerAcceptor,该Acceptor绑定到一个线程池中,此处的exec_为IO线程池
// exec_->getEventBase(h) 表示获取io线程句柄h对应的eventbase
auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
{
Mutex::WriteHolder holder(workersMutex_.get());
// 插入映射(IO线程句柄、ServerAcceptor)
workers_->insert({h, worker});
}
// 遍历所有Listening中的socket,理论上在调用bind之前这里应该直接为空
for (auto socket : *sockets_) {
// 在eventbase中执行
socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[this, worker, socket]() {
// 添加accept回调为ServerAcceptor,也就是会在io线程池中执行ServerAcceptor回调
// 这个回调有connectionAccepted、acceptError、acceptStarted、acceptStopped
socketFactory_->addAcceptCB(socket, worker.get(), worker->getEventBase());
});
}
}
threadStarted函数主要作用就是使用前文设置的acceptorFactory_(默认为ServerAcceptorFactory)来创建一个acceptor(默认为ServerAcceptor),并将其放到workers_(本质为一个map)映射起来,后面还会用到。其中,重点看一下ServerAcceptorFactory的newAcceptor实现:
std::shared_ptr<Acceptor> newAcceptor(folly::EventBase *base) {
auto acceptor = std::make_shared<ServerAcceptor<Pipeline>>(acceptPipelineFactory_, childPipelineFactory_,
accConfig_);
// 初始化这个acceptor
acceptor->init(nullptr, base, nullptr);
return acceptor;
}
首先是创建一个ServerAcceptor,然后调用init对其进行初始化。
void init(folly::AsyncServerSocket *serverSocket,
folly::EventBase *eventBase,
SSLStats *stats = nullptr) override {
// eventBase为io线程的
Acceptor::init(serverSocket, eventBase, stats);
// 创建acceptPipeline,参数为Acceptor
acceptPipeline_ = acceptPipelineFactory_->newPipeline(this);
// 如果设置了childPipelineFactory,这就意味着没有自己提供定制的AcceptPipelineFactory
// 而是采用了默认的,因此需要将ServerAcceptor(本身也是一个Inbound Handler)也添加到
// AcceptPipeline
if (childPipelineFactory_) {
// This means a custom AcceptPipelineFactory was not passed in via
// pipeline() and we're using the DefaultAcceptPipelineFactory.
// Add the default inbound handler here.
acceptPipeline_->addBack(this);
}
acceptPipeline_->finalize();
}
这里需要注意的一点是,ServerAcceptor本身还是一个wangle::InboundHandler<AcceptPipelineType>类型的Handler,所以将其加入到aceeptor Pipeline中(也是acceptor中唯一一个Handler)。当ServerSocket acceptor一个新连接之后,会调用AcceptorCB回调函数,回调函数相应的方法经过一些处理之后就会在acceptor Pipeline中触发相应的事件。具体的过程后文在讲解一个新连接到来过程的时候还会具体的说明。
分析完了group函数,继续回到bindImpl函数:
bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1);
这行代码的意思是,如果代码显示设置过reusePort_或者是acceptor的线程池中的线程数大于1,那么就开起重用端口设置,也就是说,当存在多个acceptor线程池时,同一个端口会在多个acceptor线程池上同时启用监听。接下来了定义了一个startupFunc lambda表达式,它使用上文设置的socketFactory_的newSocket创建一个AsyncServerSocket,newSocket一共需要四个参数,分别为:要监听的ServerSocket地址、acceptor的backlog大小、是否重用端口标识、ServerSocket配置信息。以AsyncServerSocketFactory为例,代码如下:
std::shared_ptr<folly::AsyncSocketBase> newSocket(
folly::SocketAddress address, int /*backlog*/, bool reuse,
ServerSocketConfig& config) override {
//获取当前线程的eventbase(一定会在accept线程)
auto* evb = folly::EventBaseManager::get()->getEventBase();
// 创建AsyncServerSocket
std::shared_ptr<folly::AsyncServerSocket> socket(new folly::AsyncServerSocket(evb),ThreadSafeDestructor());
//是否重用端口
socket->setReusePortEnabled(reuse);
// 是否使能tcp的fastopen
if (config.enableTCPFastOpen) {
socket->setTFOEnabled(true, config.fastOpenQueueSize);
}
// 绑定的地址
socket->bind(address);
// 设置监听参数,启动监听
socket->listen(config.acceptBacklog);
// 开始accept,这里不会阻塞,只是向事件层注册了持久的read事件
socket->startAccepting();
return socket;
}
前面只是定义了一个startupFunc函数并没有执行,那么它在哪里执行呢?继续看bindImpl后面的代码:
auto wait0 = std::make_shared<folly::Baton<>>();
// 在acceptor_group_线程池中添加并执行startupFunc任务(异步)
acceptor_group_->add(std::bind(startupFunc, wait0));
wait0->wait();//等待
可以看到,startupFunc函数被放在了acceptor_group_线程池中执行。同时,如前文所说,如果acceptor线程数大于1,那么会在所有的acceptor线程池中启用监听,代码如下:
for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
auto barrier = std::make_shared<folly::Baton<>>();
acceptor_group_->add(std::bind(startupFunc, barrier));
barrier->wait();
}
bindImpl函数的最后:
// 遍历new_sockets(所有新创建的listening中的socket)
for (auto &socket : new_sockets) {
// 遍历IO线程池
workerFactory_->forEachWorker([this, socket](Acceptor *worker) {
// 在acceptor线程中执行
socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[this, worker, socket]() {
// 异步的添加accept回调worker
socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
});
});
// 缓存所有处于listening状态的socket
sockets_->push_back(socket);
}
遍历处于listening状态的socket,并为每一个scoket设置AcceptCB(acceptor回调函数,在每一个socket acceptor一个连接后会执行acceptor回调函数)和回调函数执行的线程(此处为IO线程)。这里的回调函数其实就是一个Acceptor,它是在什么时候创建的呢?还记得前文为IO线程池添加观察者时的threadStarted函数吗?是的,就是在threadStarted中为了IO线程池创建了一个Acceptor,来看一下Acceptor的继承关系:
class Acceptor :
public folly::AsyncServerSocket::AcceptCallback,
public wangle::ConnectionManager::Callback,
public folly::AsyncUDPServerSocket::Callback
重点看一下folly::AsyncServerSocket::AcceptCallback:
class AcceptCallback {
public:
virtual ~AcceptCallback() = default;
virtual void connectionAccepted(int fd,const SocketAddress& clientAddr)noexcept = 0;
virtual void acceptError(const std::exception& ex) noexcept = 0;
virtual void acceptStarted() noexcept {}
virtual void acceptStopped() noexcept {}
};
该回调接口分别对应了acceptor的不同状态。
完整的acceptor过程
前文重点讲解了bind的过程,概括来说主要完成看:创建AcceptorPipeline、启动端口监听、为ServerSocket设置Acceptor回调函数等。
那么当一个处于listening状态的ServerSocket accept一个新连接时会发生什么事情呢?
前文提到过,每一个处于listening状态的ServerSocker都设置了一个AcceptorCB,这里AcceptorCB就是ServerAcceptor,也就是当有一个新连接被accept时,Acceptor中的connectionAccepted就会被调用,connectionAccepted只是记录了一下accept时间然后调用onDoneAcceptingConnection:
void Acceptor::connectionAccepted(
int fd, const SocketAddress &clientAddr) noexcept {
namespace fsp = folly::portability::sockets;
if (!canAccept(clientAddr)) {
// Send a RST to free kernel memory faster
struct linger optLinger = {1, 0};
fsp::setsockopt(fd, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));
close(fd);
return;
}
// 记录accept开始时间
auto acceptTime = std::chrono::steady_clock::now();
for (const auto &opt: socketOptions_) {
opt.first.apply(fd, opt.second);
}
onDoneAcceptingConnection(fd, clientAddr, acceptTime);
}
onDoneAcceptingConnection定义了连接的TransportInfo,继续调用processEstablishedConnection。
void Acceptor::onDoneAcceptingConnection(
int fd,
const SocketAddress &clientAddr,
std::chrono::steady_clock::time_point acceptTime) noexcept {
TransportInfo tinfo;
processEstablishedConnection(fd, clientAddr, acceptTime, tinfo);
}
processEstablishedConnection中主要处理了一大堆和ssl相关的问题,关键代码如下:
tinfo.secure = false;
tinfo.acceptTime = acceptTime;
// 创建AsyncSocket,此处的base_是之前init的时候传进来的IO线程
AsyncSocket::UniquePtr sock(makeNewAsyncSocket(base_, fd));
tinfo.tfoSucceded = sock->getTFOSucceded();
plaintextConnectionReady(
std::move(sock),
clientAddr,
empty_string,
SecureTransportType::NONE,
tinfo);
最终processEstablishedConnection会调用plaintextConnectionReady函数,代码如下:
void Acceptor::plaintextConnectionReady(
AsyncTransportWrapper::UniquePtr sock,
const SocketAddress &clientAddr,
const string &nextProtocolName,
SecureTransportType secureTransportType,
TransportInfo &tinfo) {
connectionReady(
std::move(sock),
clientAddr,
nextProtocolName,
secureTransportType,
tinfo);
}
plaintextConnectionReady会调用connectionReady,connectionReady最终会调用onNewConnection,而onNewConnection是Acceptor定义的一个抽象方法 ,ServerAcceptor将其实现为:
void onNewConnection(folly::AsyncTransportWrapper::UniquePtr transport,
const folly::SocketAddress *clientAddr,
const std::string &nextProtocolName,
SecureTransportType secureTransportType,
const TransportInfo &tinfo) override {
ConnInfo connInfo = {transport.release(), clientAddr, nextProtocolName, secureTransportType, tinfo};
// 在acceptPipeline传播read
acceptPipeline_->read(connInfo);
}
重点关注“acceptPipeline_->read(connInfo)”,该连接最终会在Acceptor Pipeline中以read事件传播,前文也说过,accept pipeline中默认只有一个唯一的Handler,就是ServerAcceptor本身,那么看一下ServerAcceptor中的read方法都干了什么:
void read(Context *, AcceptPipelineType conn) override {
if (conn.type() != typeid(ConnInfo &)) {
return;
}
auto connInfo = boost::get<ConnInfo &>(conn);
folly::AsyncTransportWrapper::UniquePtr transport(connInfo.sock);
// Setup local and remote addresses
auto tInfoPtr = std::make_shared<TransportInfo>(connInfo.tinfo);
tInfoPtr->localAddr = std::make_shared<folly::SocketAddress>(accConfig_.bindAddress);
transport->getLocalAddress(tInfoPtr->localAddr.get());
tInfoPtr->remoteAddr = std::make_shared<folly::SocketAddress>(*connInfo.clientAddr);
tInfoPtr->appProtocol = std::make_shared<std::string>(connInfo.nextProtoName);
// 为新连接创建一个pipeline(参数为AsyncTransport)
auto pipeline = childPipelineFactory_->newPipeline(
std::shared_ptr<folly::AsyncTransportWrapper>(
transport.release(), folly::DelayedDestruction::Destructor()));
// 设置TransportInfo
pipeline->setTransportInfo(tInfoPtr);
// 创建一个新的可被管理的连接,并绑定pipeline(相当于Netty中的Channel)
auto connection = new ServerConnection(std::move(pipeline));
// 将连接管理起来
Acceptor::addConnection(connection);
// 初始化这个连接
connection->init();
}
首先,设置了远端地址和本地地址,然后使用childPipelineFactory_创建了一个新连接的Pipeline,最后初始化这个新连接,init方法非常简单:
void init() {
// 在该连接绑定的pipeline中引发Active事件
pipeline_->transportActive();
}
至此,是不是看到了熟悉的身影,没错,从此处,事件便开始在你的业务handler中传播了。如果要从最源头考虑一个新连接建立的过程(包括folly、libevent那一层),那么可以简单归纳为:
handlerReady->consumeMessages->messageAvailable->connectionAccepted->onDoneAcceptingConnection
->processEstablishedConnection->plaintextConnectionReady->connectionReady->onNewConnection
下面来一张图,便于理解:
本系列文章
Wangle源码分析:Service
Wangle源码分析:ServerBootstrap
Wangle源码分析:编解码Handler
Wangle源码分析:EventBaseHandler、AsyncSocketHandler
Wangle源码分析:Pipeline、Handler、Context
Wangle源码分析:ClientBootstrap