这篇文章上次修改于 903 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

1 观察者模式

1.1 简介

  • 定义
    观察者模式定义了一系列对象之间的一对多关系。当一个对象改变状态,其它依赖者都会收到通知。

img #50%

  • 优点

    • 下一月让主题和观察者之间松耦合。主题只知道观察者实现了 Observer 接口,但是不需要知道观察者是谁,做了什么或其它细节。主题唯一依赖的是一个实现 Observer 接口的对象列表,所以可以随时添加、删除观察者。
    • 观察者模式提供了一种对象设计模式,让主题和观察者之间松耦合。为什么呢?主题只知道观察者实现了某个接口(Observer 接口),但是不需要知道观察者的具体类是谁、做了什么,或者其它细节。
    • 任何时候都可以添加新的观察者。因为主题唯一依赖的东西是一个实现 Observer 接口的对象列表。
    • 当有新的观察者出现,主题的代码不需要修改,只需要在新类里实现观察者接口,然后注册为观察者即可。主题只需要向实现了该接口的所有对象发送通知。
    • 改变主题或观察者其中一方并不会影响另一方,因为两者是松耦合的,只要它们之间的接口仍被遵守,就可以自由修改。下一月
  • 设计原则:为了交互对象之间的松耦合设计而努力。松耦合的设计可以让我们建立有弹性的系统,能够应对变化,因为对象之间的互相依赖降到了最低。
  • 适用场景
    当一个对象状态的改变需要改变其他对象,或实际对象是事先未知的或动态变化的时,可使用观察者模式。
  • 推:推的方式被认为更“正确”。但观察者可能会被强迫收到一堆数据。
  • 拉:如果主题需要增加更多的状态,不用修改和更新对每个观察者的调用,只需要改变自己允许更多的 getter 方法来取得新增的状态。但观察者可能需要调用多次才能收集全所需要的状态。
  • 简单示例(没有处理异常情况,没有禁止拷贝构造函数,没有将 .cc 和 .h 拆开)

    #include <iostream>
    #include <vector>
    
    // 观察者定义
    class Observer {
     public:
      virtual void Update(const float& temperature) = 0;
    };
    
    // 主题定义
    class Subject {
     public:
      virtual bool RegisterObserver(Observer* observer) = 0;
      virtual bool RemoveObserver(Observer* observer) = 0;
      virtual void NotifyObservers() = 0;
    };
    
    // 具体主题
    class WeatherData : Subject {
     public:
      bool RegisterObserver(Observer* observer) override;
      bool RemoveObserver(Observer* observer) override;
      void NotifyObservers() override;
      void SetTemperature(const float& temperature);
    
     private:
      float temperature_;
      std::vector<Observer*> observers_;
    };
    
    bool WeatherData::RegisterObserver(Observer* observer) {
        if (observer == nullptr) {
            return false;
        }
        observers_.push_back(observer);
        return true;
    }
    
    bool WeatherData::RemoveObserver(Observer* observer) {
      auto iter = std::find(observers_.begin(), observers_.end(), observer);
      if (iter == observers_.end()) {
          return false;
      }
      observers_.erase(iter);
      return true;
    }
    
    // 当温度发生变化时,通知观察者
    void WeatherData::SetTemperature(const float& temperature) {
        temperature_ = temperature;
        NotifyObservers();
    }
    
    void WeatherData::NotifyObservers() {
        for (int i = 0; i < observers_.size(); i++) {
            observers_[i]->Update(temperature_);
        }
    }
    
    // 具体观察者
    class WeatherDataObserver : Observer {
     public:
      void Update(const float& temperature) override;
    };
    
    void WeatherDataObserver::Update(const float& temperature) {
        std::cout << temperature << std::endl;
    }

1.2 autocar 中的示例

1.2.1 module 的观察者

Module 作为主题,它只需要知道观察者实现了 OnIterationBegin() 和 OnIterationEnd() 接口即可。

对外提供 AddIterationObserver() 随时添加观察者,提供 RemoveIterationObserver() 随时删除观察者。

class Module {
 public:
  class IterationObserver {
   public:
    virtual ~IterationObserver() = default;
    virtual void OnIterationBegin(Module* module) = 0;
    virtual void OnIterationEnd(Module* module) = 0;
  };
  ...
  bool AddIterationObserver(IterationObserver* observer);
  bool RemoveIterationObserver(IterationObserver* observer);
  bool HasIterationObserver(IterationObserver* observer) const;
  ...
  
 private:
  ...
  std::vector<IterationObserver*> observers_;
  ...
}

bool Module::AddIterationObserver(IterationObserver* observer) {
  if (HasIterationObserver(observer)) {
    return false;
  }

  observers_.emplace_back(observer);
  return true;
}

bool Module::RemoveIterationObserver(IterationObserver* observer) {
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
  if (iter == observers_.end()) {
    return false;
  }

  observers_.erase(iter);
  return true;
}

在模块的迭代周期中的开始和结束时刻通知观察者。

int Module::OneIteration() {
  ...
  for (auto& observer : observers_) {
    observer->OnIterationBegin(this);
  }
  ...
  IterationRoutine();
  ...
  for (auto& observer : observers_) {
    observer->OnIterationEnd(this);
  }
  ...
  return 0;
}

接下来看下在 autocar 中 module 的观察者有哪些。

例如,在 onboard_data_collector.cc 中注册过 module 的观察者。

void OnboardDataCollector::InstallOnModule(Module* module) {
  module->AddIterationObserver(this);
}

它主要是用来记录 module 执行的开始和结束时间,并将这些信息异步落盘,以便后续分析模块性能。

void OnboardDataCollector::OnIterationBegin(Module* module) {
  int64_t timestamp = OnboardClockTime();
  int64_t iteration_id = module->iteration_num();
  const std::string& node_name = module->node()->name();
  worker_thread_->AddTask([this, timestamp, node_name, iteration_id]() {
      auto message = GenerateInvokeEvent(node_name, iteration_id);
      PrepareHeader(timestamp, message.get());
      event_recorder_->AsyncRecordEvent(message);
    });
}

void OnboardDataCollector::OnIterationEnd(Module* module) {
  int64_t timestamp = OnboardClockTime();
  int64_t iteration_id = module->iteration_num();
  const std::string& node_name = module->node()->name();
  worker_thread_->AddTask([this, timestamp, node_name, iteration_id]() {
      auto message = GenerateInvokeFinishEvent(node_name, iteration_id);
      PrepareHeader(timestamp, message.get());
      event_recorder_->AsyncRecordEvent(message);
    });
}

img #50%

1.2.2 node 的观察者

1.2.2.1 性能信息收集观察者

Node 提供如下接口用来注册性能信息收集观察者:

void Node::RegisterPublishObserver(
    std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
  publish_observer_.emplace_back(observer);
}

void Node::RegisterPreReceiveObserver(
    std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
  pre_receive_observer_.emplace_back(observer);
}

void Node::RegisterPostReceiveObserver(
    std::function<void(Node* node, std::shared_ptr<const OnboardMessage>)> observer) {
  post_receive_observer_.emplace_back(observer);
}

性能信息收集观察者用来在发布消息时、接收消息前和接收消息后打点,并将这些打点信息落盘,用来分析传输过程时延和回调时延:

void OnboardDataCollector::InstallOnNode(Node* node) {
  node->RegisterPublishObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
      int64_t timestamp = OnboardClockTime();
      worker_thread_->AddTask([this, timestamp, node, message]() {
          if (IsRecordEnabled(message)) {
            event_recorder_->AsyncRecordEvent(message);
          }
          auto schedule_message = GeneratePublisherEvent(node, *message);
          PrepareHeader(timestamp, schedule_message.get());
          event_recorder_->AsyncRecordEvent(schedule_message);
        });
    });

  node->RegisterPreReceiveObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
      int64_t timestamp = OnboardClockTime();
      worker_thread_->AddTask([this, timestamp, node, message]() {
          auto update_message = GeneratePreUpdateEvent(node, *message);
          PrepareHeader(timestamp, update_message.get());
          event_recorder_->AsyncRecordEvent(update_message);
        });
    });

  node->RegisterPostReceiveObserver([this](Node* node, std::shared_ptr<const OnboardMessage> message) {
      int64_t timestamp = OnboardClockTime();
      worker_thread_->AddTask([this, timestamp, node, message]() {
          auto update_message = GenerateUpdateEvent(node, *message);
          PrepareHeader(timestamp, update_message.get());
          event_recorder_->AsyncRecordEvent(update_message);
        });
    });
}

传输过程时延:

img

回调过程时延:

img

1.2.2.2 topic 观察者

Node 提供如下函数用来注册 Topic 观察者,在接受到消息是执行一些操作:

void Node::RegisterTopicObserver(const std::string& topic, MessageCallback callback) {
  message_context_->RegisterTopicObserver(topic, callback);
}

每个 Node 都有一个 MessageContext,实际上会被注册到 MessageContextt 中。:

void MessageContext::RegisterTopicObserver(const std::string& topic, MessageCallback callback)  {
 ...
 topic_observer_[topic]->emplace_back(callback);
 ...
}

注册 topic 观察者示例 1:将消息计数加 1。

class CounterModule : public Module {
 public:
  ...
  base::Status Initialize() override {
    for (const auto &subscriber : node()->config().subscriber()) {
      node()->RegisterTopicObserver(
          subscriber.topic_name(), [this](std::shared_ptr<const OnboardMessage> message) {
            topic_counter_[message->header().topic_name()]++;
            return true;
          });
    }

    initialized_ = true;
    return base::Status::OK();
  }
  ...
}

1.2.2.3 观察者被通知的地方

发送消息时:

void Node::PublishMessage(const std::string& topic_name, std::shared_ptr<OnboardMessage> message) {
  ...
  PublishMessageWithHeader(topic_name, message);
}

void Node::PublishMessageWithHeader(const std::string& topic_name, const OnboardMessage& message) {
  ...
  PublishMessageWithHeader(topic_name, publish_message);
}

void Node::PublishMessageWithHeader(const std::string& topic_name,
                                          std::shared_ptr<OnboardMessage> message) {
  ...
  CallPublishObservers(message);
  ...
}

void Node::CallPublishObservers(std::shared_ptr<const OnboardMessage> message) {
  base::MutexLock lock(&publish_mutex_);
  for (auto observer : publish_observer_) {
    DCHECK(observer != nullptr);
    observer(this, message);
  }
}

收到消息时:

void MessageContext::OnMessageReceived(const OnboardMessage& message) {
  ...
  OnMessageReceived(shared_message);
}

void MessageContext::OnMessageReceived(std::shared_ptr<const OnboardMessage> message) {
  ...
  CallObserver(message);
}

void MessageContext::CallObserver(std::shared_ptr<const OnboardMessage> message) {
  ...
  for (auto item : pre_receive_observer_) {
    item.callback(message);
  }

  const std::vector<MessageCallback>& observers = *(topic_observer_[topic_name]);
  for (auto callback : observers) {
    callback(message);
  }

  for (auto item : post_receive_observer_) {
    item.callback(message);
  }
}

2 订阅者模式

img

2.1 与观察者模式的区别

观察者负责监控主题状态的更新,如果主题的状态发生了变化,那么观察者根据状态的变更执行相关的操作。即,观察模式定义了主题与观察者的直接交互或通信关系。

发布订阅模式中存在发布者、订阅者和消息中心,订阅者需要向消息中心指定自己对哪些数据感兴趣,消息中心根据订阅者订阅信息推送数据。即,发布者和订阅者之间引入了消息中心,实现的是间接通信。

观察者模式:

  • 观察者和主题之间是松耦合,主题需要记录有哪些观察者,观察者需要了解主题。

发布者和订阅者模式:

  • 订阅者和发布者之间互不了解,它们在代理或消息队列的帮助下进行通信,实现了完全解耦。

img

2.2 autocar 中的示例

Publisher Node 只需要在特定 Topic 上发布消息,Subscriber Node 只需要在指定 Topic 上订阅消息,发布者和订阅者之间不需要知道彼此的存在。

NodeConfig 定义了发布-订阅者关系:Canbus 会在 Topic: "/walle/alert_event" 上发布消息,并订阅了 Topic: "/apollo/control" 上的消息。

name: "Canbus"
module_name: "Canbus"

...

publisher {
  topic_name:  "/walle/alert_event"
  message_buffer_size: 10
  latch: true
}

subscriber {
  topic_name: "/apollo/control"
  message_buffer_size: 10
}

...

发布订阅关系建立:Node 实际上通过 Comms 进行通信,Node.Initialize() 会调用 Comms.Initialize() 来订阅指定的主题。CommsContext 维护所有 Comms 之间的发布订阅关系。

异步模式:将 comms 自身注册到指定的 topic 中。

bool DirectComms::Initialize(Node* node) {
    ...
  for (const auto &subscriber : node->config().subscriber()) {
      const std::string& topic_name = subscriber.topic_name();
      context_->Subscribe(topic_name, this);
  }
  ...
}

void DirectCommsContext::Subscribe(const std::string& topic, DirectComms* comms) {
  observer_[topic].emplace_back(comms);
}

同步模式:将 OnMessageReceived(message) 注册到指定的 Topic 中。

OnMessageReceived(message) 会通知前面提到的性能信息收集观察者和 Topic 观察者。

bool SyncComms::Initialize(Node* node) {
  ...
  for (const auto &subscriber : node->config().subscriber()) {
    const std::string& topic_name = subscriber.topic_name();
    auto callback = [&](std::shared_ptr<const OnboardMessage> message) {
      node_->message_context()->OnMessageReceived(message);
    };
    context_->Subscribe(topic_name, callback);
  }
  ...
}

void SyncCommsContext::Subscribe(const std::string& topic, MessageCallback callback) {
  observer_[topic].emplace_back(callback);
}

Node 发布消息:调用 Comms.PublishMessage(),该函数会根据消息所属的 Topic 查找所有的订阅者。

void Node::PublishMessage(const std::string& topic_name, std::shared_ptr<OnboardMessage> message) {
  ...
  PublishMessageWithHeader(topic_name, message);
}

void Node::PublishMessageWithHeader(const std::string& topic_name,
                                    std::shared_ptr<OnboardMessage> message) {
  ...
  comms_->PublishMessage(message);  
}

异步模式:将 OnMessageReceived(message) 加入到任务队列中。

void DirectComms::PublishMessage(std::shared_ptr<OnboardMessage> message) {
  std::string topic_name = message->header().topic_name();
  ...
  auto subscribers = context_->GetObservers(topic_name);
  ...
  for (DirectComms* comms : subscribers) {
    comms->OnMessageReceived(message);
  }
}

void DirectComms::OnMessageReceived(std::shared_ptr<const OnboardMessage> message) {
  ...
  topic_queue_->AddTask([message, this]() {
      ...
      node_->message_context()->OnMessageReceived(message);
    }, message->header().timestamp(), topic_name);
  ...
}

同步模式:直接调用 OnMessageReceived(message)。

void SyncComms::PublishMessage(std::shared_ptr<OnboardMessage> message) {
  ...
  context_->Publish(message);
}

void SyncCommsContext::Publish(std::shared_ptr<OnboardMessage> message) {
  ...
  auto callbacks = observer_[message->header().topic_name()];
  for (auto callback : callbacks) {
    callback(message);
  }
}

参考

Eric Freeman, Elisabeth Freeman, Kathy Sierra and Bert Bates. Head First 设计模式 M. 北京:中国电力出版社. 2010.

聂鹏程. 分布式通信之发布订阅:送货上门 E. 2019.