这篇文章上次修改于 903 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
1 观察者模式
1.1 简介
- 定义
观察者模式定义了一系列对象之间的一对多关系。当一个对象改变状态,其它依赖者都会收到通知。
优点
- 下一月让主题和观察者之间松耦合。主题只知道观察者实现了 Observer 接口,但是不需要知道观察者是谁,做了什么或其它细节。主题唯一依赖的是一个实现 Observer 接口的对象列表,所以可以随时添加、删除观察者。
- 观察者模式提供了一种对象设计模式,让主题和观察者之间松耦合。为什么呢?主题只知道观察者实现了某个接口(Observer 接口),但是不需要知道观察者的具体类是谁、做了什么,或者其它细节。
- 任何时候都可以添加新的观察者。因为主题唯一依赖的东西是一个实现 Observer 接口的对象列表。
- 当有新的观察者出现,主题的代码不需要修改,只需要在新类里实现观察者接口,然后注册为观察者即可。主题只需要向实现了该接口的所有对象发送通知。
- 改变主题或观察者其中一方并不会影响另一方,因为两者是松耦合的,只要它们之间的接口仍被遵守,就可以自由修改。下一月
- 设计原则:为了交互对象之间的松耦合设计而努力。松耦合的设计可以让我们建立有弹性的系统,能够应对变化,因为对象之间的互相依赖降到了最低。
- 适用场景
当一个对象状态的改变需要改变其他对象,或实际对象是事先未知的或动态变化的时,可使用观察者模式。 - 推:推的方式被认为更“正确”。但观察者可能会被强迫收到一堆数据。
- 拉:如果主题需要增加更多的状态,不用修改和更新对每个观察者的调用,只需要改变自己允许更多的 getter 方法来取得新增的状态。但观察者可能需要调用多次才能收集全所需要的状态。
简单示例(没有处理异常情况,没有禁止拷贝构造函数,没有将 .cc 和 .h 拆开)
#include <iostream> #include <vector> // 观察者定义 class Observer { public: virtual void Update(const float& temperature) = 0; }; // 主题定义 class Subject { public: virtual bool RegisterObserver(Observer* observer) = 0; virtual bool RemoveObserver(Observer* observer) = 0; virtual void NotifyObservers() = 0; }; // 具体主题 class WeatherData : Subject { public: bool RegisterObserver(Observer* observer) override; bool RemoveObserver(Observer* observer) override; void NotifyObservers() override; void SetTemperature(const float& temperature); private: float temperature_; std::vector<Observer*> observers_; }; bool WeatherData::RegisterObserver(Observer* observer) { if (observer == nullptr) { return false; } observers_.push_back(observer); return true; } bool WeatherData::RemoveObserver(Observer* observer) { auto iter = std::find(observers_.begin(), observers_.end(), observer); if (iter == observers_.end()) { return false; } observers_.erase(iter); return true; } // 当温度发生变化时,通知观察者 void WeatherData::SetTemperature(const float& temperature) { temperature_ = temperature; NotifyObservers(); } void WeatherData::NotifyObservers() { for (int i = 0; i < observers_.size(); i++) { observers_[i]->Update(temperature_); } } // 具体观察者 class WeatherDataObserver : Observer { public: void Update(const float& temperature) override; }; void WeatherDataObserver::Update(const float& temperature) { std::cout << temperature << std::endl; }
1.2 autocar 中的示例
1.2.1 module 的观察者
Module 作为主题,它只需要知道观察者实现了 OnIterationBegin() 和 OnIterationEnd() 接口即可。
对外提供 AddIterationObserver() 随时添加观察者,提供 RemoveIterationObserver() 随时删除观察者。
class Module {
public:
class IterationObserver {
public:
virtual ~IterationObserver() = default;
virtual void OnIterationBegin(Module* module) = 0;
virtual void OnIterationEnd(Module* module) = 0;
};
...
bool AddIterationObserver(IterationObserver* observer);
bool RemoveIterationObserver(IterationObserver* observer);
bool HasIterationObserver(IterationObserver* observer) const;
...
private:
...
std::vector<IterationObserver*> observers_;
...
}
bool Module::AddIterationObserver(IterationObserver* observer) {
if (HasIterationObserver(observer)) {
return false;
}
observers_.emplace_back(observer);
return true;
}
bool Module::RemoveIterationObserver(IterationObserver* observer) {
auto iter = std::find(observers_.begin(), observers_.end(), observer);
if (iter == observers_.end()) {
return false;
}
observers_.erase(iter);
return true;
}
在模块的迭代周期中的开始和结束时刻通知观察者。
int Module::OneIteration() {
...
for (auto& observer : observers_) {
observer->OnIterationBegin(this);
}
...
IterationRoutine();
...
for (auto& observer : observers_) {
observer->OnIterationEnd(this);
}
...
return 0;
}
接下来看下在 autocar 中 module 的观察者有哪些。
例如,在 onboard_data_collector.cc 中注册过 module 的观察者。
void OnboardDataCollector::InstallOnModule(Module* module) {
module->AddIterationObserver(this);
}
它主要是用来记录 module 执行的开始和结束时间,并将这些信息异步落盘,以便后续分析模块性能。
void OnboardDataCollector::OnIterationBegin(Module* module) {
int64_t timestamp = OnboardClockTime();
int64_t iteration_id = module->iteration_num();
const std::string& node_name = module->node()->name();
worker_thread_->AddTask([this, timestamp, node_name, iteration_id]() {
auto message = GenerateInvokeEvent(node_name, iteration_id);
PrepareHeader(timestamp, message.get());
event_recorder_->AsyncRecordEvent(message);
});
}
void OnboardDataCollector::OnIterationEnd(Module* module) {
int64_t timestamp = OnboardClockTime();
int64_t iteration_id = module->iteration_num();
const std::string& node_name = module->node()->name();
worker_thread_->AddTask([this, timestamp, node_name, iteration_id]() {
auto message = GenerateInvokeFinishEvent(node_name, iteration_id);
PrepareHeader(timestamp, message.get());
event_recorder_->AsyncRecordEvent(message);
});
}
1.2.2 node 的观察者
1.2.2.1 性能信息收集观察者
Node 提供如下接口用来注册性能信息收集观察者:
void Node::RegisterPublishObserver(
std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
publish_observer_.emplace_back(observer);
}
void Node::RegisterPreReceiveObserver(
std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
pre_receive_observer_.emplace_back(observer);
}
void Node::RegisterPostReceiveObserver(
std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
post_receive_observer_.emplace_back(observer);
}
性能信息收集观察者用来在发布消息时、接收消息前和接收消息后打点,并将这些打点信息落盘,用来分析传输过程时延和回调时延:
void OnboardDataCollector::InstallOnNode(Node* node) {
node->RegisterPublishObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
int64_t timestamp = OnboardClockTime();
worker_thread_->AddTask([this, timestamp, node, message]() {
if (IsRecordEnabled(message)) {
event_recorder_->AsyncRecordEvent(message);
}
auto schedule_message = GeneratePublisherEvent(node, *message);
PrepareHeader(timestamp, schedule_message.get());
event_recorder_->AsyncRecordEvent(schedule_message);
});
});
node->RegisterPreReceiveObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
int64_t timestamp = OnboardClockTime();
worker_thread_->AddTask([this, timestamp, node, message]() {
auto update_message = GeneratePreUpdateEvent(node, *message);
PrepareHeader(timestamp, update_message.get());
event_recorder_->AsyncRecordEvent(update_message);
});
});
node->RegisterPostReceiveObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
int64_t timestamp = OnboardClockTime();
worker_thread_->AddTask([this, timestamp, node, message]() {
auto update_message = GenerateUpdateEvent(node, *message);
PrepareHeader(timestamp, update_message.get());
event_recorder_->AsyncRecordEvent(update_message);
});
});
}
传输过程时延:
回调过程时延:
1.2.2.2 topic 观察者
Node 提供如下函数用来注册 Topic 观察者,在接受到消息是执行一些操作:
void Node::RegisterTopicObserver(const std::string& topic, MessageCallback callback) {
message_context_->RegisterTopicObserver(topic, callback);
}
每个 Node 都有一个 MessageContext,实际上会被注册到 MessageContextt 中。:
void MessageContext::RegisterTopicObserver(const std::string& topic, MessageCallback callback) {
...
topic_observer_[topic]->emplace_back(callback);
...
}
注册 topic 观察者示例 1:将消息计数加 1。
class CounterModule : public Module {
public:
...
base::Status Initialize() override {
for (const auto &subscriber : node()->config().subscriber()) {
node()->RegisterTopicObserver(
subscriber.topic_name(), [this](std::shared_ptr<const OnboardMessage> message) {
topic_counter_[message->header().topic_name()]++;
return true;
});
}
initialized_ = true;
return base::Status::OK();
}
...
}
1.2.2.3 观察者被通知的地方
发送消息时:
void Node::PublishMessage(const std::string& topic_name, std::shared_ptr<OnboardMessage> message) {
...
PublishMessageWithHeader(topic_name, message);
}
void Node::PublishMessageWithHeader(const std::string& topic_name, const OnboardMessage& message) {
...
PublishMessageWithHeader(topic_name, publish_message);
}
void Node::PublishMessageWithHeader(const std::string& topic_name,
std::shared_ptr<OnboardMessage> message) {
...
CallPublishObservers(message);
...
}
void Node::CallPublishObservers(std::shared_ptr<const OnboardMessage> message) {
base::MutexLock lock(&publish_mutex_);
for (auto observer : publish_observer_) {
DCHECK(observer != nullptr);
observer(this, message);
}
}
收到消息时:
void MessageContext::OnMessageReceived(const OnboardMessage& message) {
...
OnMessageReceived(shared_message);
}
void MessageContext::OnMessageReceived(std::shared_ptr<const OnboardMessage> message) {
...
CallObserver(message);
}
void MessageContext::CallObserver(std::shared_ptr<const OnboardMessage> message) {
...
for (auto item : pre_receive_observer_) {
item.callback(message);
}
const std::vector<MessageCallback>& observers = *(topic_observer_[topic_name]);
for (auto callback : observers) {
callback(message);
}
for (auto item : post_receive_observer_) {
item.callback(message);
}
}
2 订阅者模式
2.1 与观察者模式的区别
观察者负责监控主题状态的更新,如果主题的状态发生了变化,那么观察者根据状态的变更执行相关的操作。即,观察模式定义了主题与观察者的直接交互或通信关系。
发布订阅模式中存在发布者、订阅者和消息中心,订阅者需要向消息中心指定自己对哪些数据感兴趣,消息中心根据订阅者订阅信息推送数据。即,发布者和订阅者之间引入了消息中心,实现的是间接通信。
观察者模式:
- 观察者和主题之间是松耦合,主题需要记录有哪些观察者,观察者需要了解主题。
发布者和订阅者模式:
- 订阅者和发布者之间互不了解,它们在代理或消息队列的帮助下进行通信,实现了完全解耦。
2.2 autocar 中的示例
Publisher Node 只需要在特定 Topic 上发布消息,Subscriber Node 只需要在指定 Topic 上订阅消息,发布者和订阅者之间不需要知道彼此的存在。
NodeConfig 定义了发布-订阅者关系:Canbus 会在 Topic: "/walle/alert_event" 上发布消息,并订阅了 Topic: "/apollo/control" 上的消息。
name: "Canbus"
module_name: "Canbus"
...
publisher {
topic_name: "/walle/alert_event"
message_buffer_size: 10
latch: true
}
subscriber {
topic_name: "/apollo/control"
message_buffer_size: 10
}
...
发布订阅关系建立:Node 实际上通过 Comms 进行通信,Node.Initialize() 会调用 Comms.Initialize() 来订阅指定的主题。CommsContext 维护所有 Comms 之间的发布订阅关系。
异步模式:将 comms 自身注册到指定的 topic 中。
bool DirectComms::Initialize(Node* node) {
...
for (const auto &subscriber : node->config().subscriber()) {
const std::string& topic_name = subscriber.topic_name();
context_->Subscribe(topic_name, this);
}
...
}
void DirectCommsContext::Subscribe(const std::string& topic, DirectComms* comms) {
observer_[topic].emplace_back(comms);
}
同步模式:将 OnMessageReceived(message) 注册到指定的 Topic 中。
OnMessageReceived(message) 会通知前面提到的性能信息收集观察者和 Topic 观察者。
bool SyncComms::Initialize(Node* node) {
...
for (const auto &subscriber : node->config().subscriber()) {
const std::string& topic_name = subscriber.topic_name();
auto callback = [&](std::shared_ptr<const OnboardMessage> message) {
node_->message_context()->OnMessageReceived(message);
};
context_->Subscribe(topic_name, callback);
}
...
}
void SyncCommsContext::Subscribe(const std::string& topic, MessageCallback callback) {
observer_[topic].emplace_back(callback);
}
Node 发布消息:调用 Comms.PublishMessage(),该函数会根据消息所属的 Topic 查找所有的订阅者。
void Node::PublishMessage(const std::string& topic_name, std::shared_ptr<OnboardMessage> message) {
...
PublishMessageWithHeader(topic_name, message);
}
void Node::PublishMessageWithHeader(const std::string& topic_name,
std::shared_ptr<OnboardMessage> message) {
...
comms_->PublishMessage(message);
}
异步模式:将 OnMessageReceived(message) 加入到任务队列中。
void DirectComms::PublishMessage(std::shared_ptr<OnboardMessage> message) {
std::string topic_name = message->header().topic_name();
...
auto subscribers = context_->GetObservers(topic_name);
...
for (DirectComms* comms : subscribers) {
comms->OnMessageReceived(message);
}
}
void DirectComms::OnMessageReceived(std::shared_ptr<const OnboardMessage> message) {
...
topic_queue_->AddTask([message, this]() {
...
node_->message_context()->OnMessageReceived(message);
}, message->header().timestamp(), topic_name);
...
}
同步模式:直接调用 OnMessageReceived(message)。
void SyncComms::PublishMessage(std::shared_ptr<OnboardMessage> message) {
...
context_->Publish(message);
}
void SyncCommsContext::Publish(std::shared_ptr<OnboardMessage> message) {
...
auto callbacks = observer_[message->header().topic_name()];
for (auto callback : callbacks) {
callback(message);
}
}
参考
Eric Freeman, Elisabeth Freeman, Kathy Sierra and Bert Bates. Head First 设计模式 M. 北京:中国电力出版社. 2010.
聂鹏程. 分布式通信之发布订阅:送货上门 E. 2019.
没有评论