异步消息处理是构建可伸缩,高度容错系统的关键技术。尽管功能强大,但开发起来也很繁琐,应考虑许多技术细节。它远没有同步系统简单和直接。 幸运的是,Micro对该编程模型做了非常优雅的抽象和封装。因此,为了方便我们可以使用它。 此外,借助Micro的接口抽象,我们可以透明(或几乎透明)地支持各种消息代理。 Micro默认情况下提供基于HTTP的消息代理。同时,它还通过插件为主流消息代理提供广泛支持,包括Kafka,RabbitMQ,Nats,MQTT,NSQ,Amazon SQS等。这使我们在切换消息代理时几乎无需修改任何业务代码。 Micro以两种不同的方式支持异步消息传递。 一个是Pub / Sub,另一个是通过micro.Broker接口处理消息。前者相对简单,而后者则提供了更大的灵活性。 Micro的内置发布/订阅功能统一并简化了消息的发送,接收,编码和解码。这使开发人员摆脱了潜在的技术细节,使他们专注于创造业务价值。在大多数情况下,我们应该选择这种方式。
首先,我们定义消息处理Handler。./subscriber/hello.go的代码如下:
package subscriber
import (
"context"
"github.com/micro/go-micro/util/log"
hello "hello/proto/hello"
)
type Hello struct{}
func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
log.Log("Handler Received message: ", msg.Say)
return nil
}
func Handler(ctx context.Context, msg *hello.Message) error {
log.Log("Function Received message: ", msg.Say)
return nil
}
只要签名是func(context.Context, v interface{}) error,函数的功能或方法都可以用于消息处理。 请注意,处理程序函数的第二个参数是*hello.Message,它在.proto文件中定义。Micro会自动解码消息,因此我们可以直接在消息处理程序中使用它。 准备消息处理程序后,您需要对其进行注册。./main.go的相关代码如下:
...
// Register Struct as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), new(subscriber.Hello))
// Register Function as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler)
...
上面的代码注册了两个消息处理处理程序,这些处理程序将接收来自名为“ com.foo.srv.hello ” 的主题的消息。 如果您想更好地控制订阅行为,则需要将其他参数传递给micro.RegisterSubscriber。首先让我们看一下该方法的签名:
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
第一个参数代表主题。第二个参数是server.Server,可以从中获取service.Server()。第三个参数是消息处理程序。 最后一个是一个可选参数,用于控制订阅的行为。其类型为server.SubscriberOption。目前,Micro提供了4种内置选项:
Micro默认情况下为每个订阅者实例创建一个全局唯一的队列。如果要与多个订阅者实例共享队列,则需要通过以下方式显式指定队列名称:
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar"))
这样,订阅实例将共享相同的队列。因此,将消息分发到某个节点进行处理,避免重复处理同一消息。 考虑到在多个节点上运行多个服务实例是分布式系统中的常见情况,我的建议是:除非您知道自己在做什么,否则始终明确指定队列名称,即使当前只有一个预订实例。最常见的做法是使用主题为队列命名。
创建一个发布消息的项目名称pub,其结构如下:
.
├── main.go
├── plugin.go
├── proto/hello
│ └── hello.proto
│ └── hello.pb.go
│ └── hello.pb.micro.go
├── go.mod
├── go.sum
除了main.go之外,其他文件的内容与上一篇文章中描述的相同,在此不再赘述。
这是main.go中的代码:
package main
import (
"context"
"log"
"time"
"github.com/micro/go-micro/v2"
hello "hello/proto/hello"
)
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.srv.hello.pub"), // name the client service
)
// Initialise service
service.Init()
// create publisher
pub := micro.NewEvent("com.foo.srv.hello", service.Client())
// publish message every second
for now := range time.Tick(time.Second) {
if err := pub.Publish(context.TODO(), &hello.Message{Say: now.String()}); err != nil {
log.Fatal("publish err", err)
}
}
}
与订阅功能类似,发布界面还支持选项,这些选项可用于控制发布行为。micro.Publisher接口的定义如下:
// Publisher is syntactic sugar for publishing
type Publisher interface {
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
当前,Micro仅提供一种内置的发布选项:
client.WithExchange(e string)PublishOption,设置交换以路由消息通过。
准备pub项目后,运行hello服务器和pub项目。 然后,我们将在hello服务器的控制台中看到接收到的消息的日志:
$ go run main.go plugin.go
2020-02-14 14:18:24.368336 I | Transport [http] Listening on [::]:56970
2020-02-14 14:18:24.368429 I | Broker [http] Connected to [::]:56971
2020-02-14 14:18:24.368680 I | Registry [mdns] Registering node: com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302
2020-02-14 14:18:24.370575 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:24.370784 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:40.415610 I | Handler Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:40.415651 I | Function Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:41.310969 I | Handler Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
2020-02-14 14:18:41.310999 I | Function Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
...
Micro完全支持异步消息传递。它通过既支持高级发布/订阅模型,也支持低级别操作micro.Broker。 Pub / Sub大大简化了异步消息传递的开发,因此我们可以专注于业务逻辑而不是技术细节。 开发人员只需要定义发布者、订阅者和消息内容。所有其他工作均由框架完成。不再需要考虑异步消息传递系统中的常见问题,例如消息路由,重传和接收确认,并且也不必考虑消息的编码和解码。 当然,这种简化也带来一些限制。如果发布/订阅无法满足您的需求,可以使用micro.Broker。
带着使命来到世上的你,给他人提供价值,才有价值