摘要:?Dispatcher 在Envoy的代碼中Dispatcher是隨處可見的,可以說在Envoy中有著舉足輕重的地位,一個Dispatcher就是一個EventLoop,其承擔了任務隊列、網(wǎng)絡事件處理、定時器、信號處理等核心功能。
Dispatcher
在Envoy的代碼中Dispatcher是隨處可見的,可以說在Envoy中有著舉足輕重的地位,一個Dispatcher就是一個EventLoop,其承擔了任務隊列、網(wǎng)絡事件處理、定時器、信號處理等核心功能。在Envoy threading model這篇文章所提到的EventLoop(Each worker thread runs a “non-blocking” event loop)指的就是這個Dispatcher對象。這個部分的代碼相對較獨立,和其他模塊耦合也比較少,但重要性卻不言而喻。下面是與Dispatcher相關的類圖,在接下來會對其中的關鍵概念進行介紹。

Dispatcher 和 Libevent
Dispatcher本質上就是一個EventLoop,Envoy并沒有重新實現(xiàn),而是復用了Libevent中的event_base,在Libevent的基礎上進行了二次封裝并抽象出一些事件類,比如FileEvent、SignalEvent、Timer等。Libevent是一個C庫,而Envoy是C++,為了避免手動管理這些C結構的內存,Envoy通過繼承unique_ptr的方式重新封裝了這些libevent暴露出來的C結構。
template
class CSmartPtr : public std::unique_ptr
public:
CSmartPtr() : std::unique_ptr
CSmartPtr(T* object) : std::unique_ptr
};
通過CSmartPtr就可以將Libevent中的一些C數(shù)據(jù)結構的內存通過RAII機制自動管理起來,使用方式如下:
extern "C" {
void event_base_free(event_base*);
}
struct evbuffer;
extern "C" {
void evbuffer_free(evbuffer*);
}
.....
typedef CSmartPtr
typedef CSmartPtr
typedef CSmartPtr
typedef CSmartPtr
在Libevent中無論是定時器到期、收到信號、還是文件可讀寫等都是事件,統(tǒng)一使用event類型來表示,Envoy中則將event作為ImplBase的成員,然后讓所有的事件類型的對象都繼承ImplBase,從而實現(xiàn)了事件的抽象。
class ImplBase {
protected:
~ImplBase();
event raw_event_;
};
SignalEvent
SignalEvent的實現(xiàn)很簡單,通過evsignal_assign來初始化事件,然后通過evsignal_add添加事件使事件成為未決狀態(tài)(關于Libevent事件狀態(tài)見附錄)。
class SignalEventImpl : public SignalEvent, ImplBase {
public:
// signal_num: 要設置的信號值
// cb: 信號事件的處理函數(shù)
SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb);
private:
SignalCb cb_;
};
SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher,
int signal_num, SignalCb cb) : cb_(cb) {
evsignal_assign(
&raw_event_, &dispatcher.base(), signal_num,
[](evutil_socket_t, short, void* arg) -> void {
static_cast
this);
evsignal_add(&raw_event_, nullptr);
}
Timer
Timer事件暴露了兩個接口一個用于關閉Timer,另外一個則用于啟動Timer,需要傳遞一個時間來設置Timer的到期時間間隔。
class Timer {
public:
virtual ~Timer() {}
virtual void disableTimer() PURE;
virtual void enableTimer(const std::chrono::milliseconds& d) PURE;
};
創(chuàng)建Timer的時候會通過evtimer_assgin對event進行初始化,這個時候事件還處于未決狀態(tài)而不會觸發(fā),需要通過event_add添加到Dispatcher中才能被觸發(fā)。
class TimerImpl : public Timer, ImplBase {
public:
TimerImpl(Libevent::BasePtr& libevent, TimerCb cb);
// Timer
void disableTimer() override;
void enableTimer(const std::chrono::milliseconds& d) override;
private:
TimerCb cb_;
};
TimerImpl::TimerImpl(DispatcherImpl& dispatcher, TimerCb cb) : cb_(cb) {
ASSERT(cb_);
evtimer_assign(
&raw_event_, &dispatcher.base(),
[](evutil_socket_t, short, void* arg) -> void {
static_cast
this);
}
disableTimer被調用時其內部會調用event_del來刪除事件,使事件成為非未決狀態(tài),enableTimer被調用時則間接調用event_add使事件成為未決狀態(tài),這樣一旦超時時間到了就會觸發(fā)超時事件。
void TimerImpl::disableTimer() { event_del(&raw_event_); }
void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
if (d.count() == 0) {
event_active(&raw_event_, EV_TIMEOUT, 0);
} else {
std::chrono::microseconds us =
std::chrono::duration_cast
timeval tv;
tv.tv_sec = us.count() / 1000000;
tv.tv_usec = us.count() % 1000000;
event_add(&raw_event_, &tv);
}
}
上面的代碼在計算timer時間timeval的時候實現(xiàn)的并不優(yōu)雅,應該避免使用像1000000這樣的不具備可讀性的數(shù)字常量,社區(qū)中有人建議可以改成如下的形式。auto secs = std::chrono::duration_cast
auto usecs =
std::chrono::duration_cast
tv.tv_secs = secs.count();
tv.tv_usecs = usecs.count();
FileEvent
socket套接字相關的事件被封裝為FileEvent,其上暴露了二個接口:activate用于主動觸發(fā)事件,典型的使用場景比如: 喚醒EventLoop、Write Buffer有數(shù)據(jù),可以主動觸發(fā)下可寫事件(Envoy中的典型使用場景)等;setEnabled用于設置事件類型,將事件添加到EventLoop中使其成為未決狀態(tài)。
void FileEventImpl::activate(uint32_t events) {
int libevent_events = 0;
if (events & FileReadyType::Read) {
libevent_events |= EV_READ;
}
if (events & FileReadyType::Write) {
libevent_events |= EV_WRITE;
}
if (events & FileReadyType::Closed) {
libevent_events |= EV_CLOSED;
}
ASSERT(libevent_events);
event_active(&raw_event_, libevent_events, 0);
}
void FileEventImpl::setEnabled(uint32_t events) {
event_del(&raw_event_);
assignEvents(events);
event_add(&raw_event_, nullptr);
}
任務隊列
Dispatcher的內部有一個任務隊列,也會創(chuàng)建一個線程專們處理任務隊列中的任務。通過Dispatcher的post方法可以將任務投遞到任務隊列中,交給Dispatcher內的線程去處理。
void DispatcherImpl::post(std::function
bool do_post;
{
Thread::LockGuard lock(post_lock_);
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
}
if (do_post) {
post_timer_->enableTimer(std::chrono::milliseconds(0));
}
}
post方法將傳遞進來的callback所代表的任務,添加到post_callbacks_所代表的類型為vector
DispatcherImpl::DispatcherImpl(TimeSystem& time_system,
Buffer::WatermarkFactoryPtr&& factory)
: ......
post_timer_(createTimer([this]() -> void { runPostCallbacks(); })),
current_to_delete_(&to_delete_1_) {
RELEASE_ASSERT(Libevent::Global::initialized(), "");
}
runPostCallbacks是一個while循環(huán),每次都從post_callbacks_中取出一個callback所代表的任務去運行,直到post_callbacks_為空。每次運行runPostCallbacks都會確保所有的任務都執(zhí)行完。顯然,在runPostCallbacks被線程執(zhí)行的期間如果post進來了新的任務,那么新任務直接追加到post_callbacks_尾部即可,而無需做喚醒線程這一動作。
void DispatcherImpl::runPostCallbacks() {
while (true) {
std::function
{
Thread::LockGuard lock(post_lock_);
if (post_callbacks_.empty()) {
return;
}
callback = post_callbacks_.front();
post_callbacks_.pop_front();
}
callback();
}
}
DeferredDeletable
最后講一下Dispatcher中比較難理解也很重要的DeferredDeletable,它是一個空接口,所有要進行延遲析構的對象都要繼承自這個空接口。在Envoy的代碼中像下面這樣繼承自DeferredDeletable的類隨處可見。
class DeferredDeletable {
public:
virtual ~DeferredDeletable() {}
};
那何為延遲析構呢?用在哪個場景呢?延遲析構指的是將析構的動作交由Dispatcher來完成,所以DeferredDeletable和Dispatcher密切相關。Dispatcher對象有一個vector保存了所有要延遲析構的對象。
class DispatcherImpl : public Dispatcher {
......
private:
........
std::vector
std::vector
std::vector
}
to_delete_1_和to_delete_2_就是用來存放所有的要延遲析構的對象,這里使用兩個vector存放,為什么要這樣做呢?。current_to_delete_始終指向當前正要析構的對象列表,每次執(zhí)行完析構后就交替指向另外一個對象列表,來回交替。
void DispatcherImpl::clearDeferredDeleteList() {
ASSERT(isThreadSafe());
std::vector
size_t num_to_delete = to_delete->size();
if (deferred_deleting_ || !num_to_delete) {
return;
}
ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
if (current_to_delete_ == &to_delete_1_) {
current_to_delete_ = &to_delete_2_;
} else {
current_to_delete_ = &to_delete_1_;
}
deferred_deleting_ = true;
for (size_t i = 0; i < num_to_delete; i++) {
(*to_delete)[i].reset();
}
to_delete->clear();
deferred_deleting_ = false;
}
上面的代碼在執(zhí)行對象析構的時候先使用to_delete來指向當前正要析構的對象列表,然后將current_to_delete_指向另外一個列表,這樣在添加延遲刪除的對象時,就可以做到安全的把對象添加到列表中了。因為deferredDelete和clearDeferredDeleteList都是在同一個線程中運行,所以current_to_delete_是一個普通的指針,可以安全的更改指針指向另外一個,而不用擔心有線程安全問題。
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
ASSERT(isThreadSafe());
current_to_delete_->emplace_back(std::move(to_delete));
ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
if (1 == current_to_delete_->size()) {
deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0));
}
}
當有要進行延遲析構的對象時,調用deferredDelete即可,這個函數(shù)內部會通過current_to_delete_把對象放到要延遲析構的列表中,最后判斷下當前要延遲析構的列表大小是否是1,如果是1表明這是第一次添加延遲析構的對象,那么就需要通過deferred_delete_timer_把背后的線程喚醒執(zhí)行clearDeferredDeleteList函數(shù)。這樣做的原因是避免多次喚醒,因為有一種情況是線程已經喚醒了正在執(zhí)行clearDeferredDeleteList,在這個過程中又有其他的對象需要析構而加入到vector中。

到此為止deferredDelete的實現(xiàn)原理就基本分析完了,可以看出它的實現(xiàn)和任務隊列的實現(xiàn)很類似,只不過一個是循環(huán)執(zhí)行callback所代表的任務,另一個是對對象進行析構。最后我們來看一下deferredDelete的應用場景,卻“為何要進行延遲析構?”在Envoy的源代碼中經常會看到像下面這樣的代碼片段。
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher,
ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket,
bool connected) {
......
}
// 傳遞裸指針到回調中
file_event_ = dispatcher_.createFileEvent(
fd(), [this](uint32_t events) -> void { onFileEvent(events); },
Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Write);
......
}
傳遞給Dispatcher的callback都是通過裸指針的方式進行回調,如果進行回調的時候對象已經析構了,就會出現(xiàn)野指針的問題,我相信C++水平還可以的同學都會看出這個問題,除非能在邏輯上保證Dispatcher的生命周期比所有對象都短,這樣就能保證在回調的時候對象肯定不會析構,但是這不可能成立的,因為Dispatcher是EventLoop的核心。
一個線程運行一個EventLoop直到線程結束,Dispatcher對象才會析構,這意味著Dispatcher對象的生命周期是最長的。所以從邏輯上沒辦法保證進行回調的時候對象沒有析構。可能有人會有疑問,對象在析構的時候把注冊的事件取消不就可以避免野指針的問題嗎? 那如果事件已經觸發(fā)了,callback正在等待運行呢? 又或者callback運行了一半呢?前者libevent是可以保證的,在調用event_del的時候可以把處于等待運行的事件取消掉,但是后者就無能為力了,這個時候如果對象析構了,那行為就是未定義了。沿著這個思路想一想,是不是只要保證對象析構的時候沒有callback正在運行就可以解決問題了呢?是的,只要保證所有在執(zhí)行中的callback執(zhí)行完了,再做對象析構就可以了。可以利用Dispatcher是順序執(zhí)行所有callback的特點,向Dispatcher中插入一個任務就是用來對象析構的,那么當這個任務執(zhí)行的時候是可以保證沒有其他任何callback在運行。通過這個方法就完美解決了這里遇到的野指針問題了。
或許有人又會想,這里是不是可以用shared_ptr和shared_from_this來解這個呢? 是的,這是解決多線程環(huán)境下對象析構的秘密武器,通過延長對象的生命周期,把對象的生命周期延長到和callback一樣,等callback執(zhí)行完再進行析構,同樣可以達到效果,但是這帶來了兩個問題,第一就是對象生命周期被無限拉長,雖然延遲析構也拉長了生命周期,但是時間是可預期的,一旦EventLoop執(zhí)行了clearDeferredDeleteList任務就會立刻被回收,而通過shared_ptr的方式其生命周期取決于callback何時運行,而callback何時運行這個是沒辦法保證的,比如一個等待socket的可讀事件進行回調,如果對端一直不發(fā)送數(shù)據(jù),那么callback就一直不會被運行,對象就一直無法被析構,長時間累積會導致內存使用率上漲。第二就是在使用方式上侵入性較強,需要強制使用shared_ptr的方式創(chuàng)建對象。
總結
Dispatcher總的來說其實現(xiàn)還是比較簡單明了的,比較容易驗證其正確性,同樣功能也相對較弱,和chromium的MessageLoop、boost的asio都是相似的用途,但是功能上差得比較多。好在這是專門給Envoy設計的,而且Envoy的場景也比較單一,不必做成那么通用的。另外一個我覺得比較奇怪的是,為什么在DeferredDeletable的實現(xiàn)中要用to_delete_1_和to_delete_2_兩個隊列交替來存放,其實按照我的理解一個隊列即可,因為clearDeferredDeleteList和deferredDelete是保證在同一個線程中執(zhí)行的,就和Dispatcher的任務隊列一樣,用一個隊列保存所有要執(zhí)行的任務,循環(huán)的執(zhí)行即可。但是Envoy中沒有這樣做,我理解這樣設計的原因可能是因為相比于任務隊列來說延遲析構的重要性更低一些,大量對象的析構如果保存在一個隊列中循環(huán)的進行析構勢必會影響其他關鍵任務的執(zhí)行,所以這里拆分成兩個隊列,多個任務交替的執(zhí)行,就好比把一個大任務拆分成了好幾個小任務順序來執(zhí)行。
附錄
Libevent狀態(tài)轉換圖
阿里云雙十一1折拼團活動:已滿6人,都是最低折扣了
【滿6人】1核2G云服務器99.5元一年298.5元三年 2核4G云服務器545元一年 1227元三年
【滿6人】1核1G MySQL數(shù)據(jù)庫 119.5元一年
【滿6人】3000條國內短信包 60元每6月
參團地址:http://click.aliyun.com/m/1000020293/
電子發(fā)燒友App





























評論