Micro实践(四)

异步消息处理是构建可伸缩,高度容错系统的关键技术。尽管功能强大,但开发起来也很繁琐,应考虑许多技术细节。它远没有同步系统简单和直接。 幸运的是,Micro对该编程模型做了非常优雅的抽象和封装。因此,为了方便我们可以使用它。 此外,借助Micro的接口抽象,我们可以透明(或几乎透明)地支持各种消息代理。 Micro默认情况下提供基于HTTP的消息代理。同时,它还通过插件为主流消息代理提供广泛支持,包括Kafka,RabbitMQ,Nats,MQTT,NSQ,Amazon SQS等。这使我们在切换消息代理时几乎无需修改任何业务代码。 Micro以两种不同的方式支持异步消息传递。 一个是Pub / Sub,另一个是通过micro.Broker接口处理消息。前者相对简单,而后者则提供了更大的灵活性。 Micro的内置发布/订阅功能统一并简化了消息的发送,接收,编码和解码。这使开发人员摆脱了潜在的技术细节,使他们专注于创造业务价值。在大多数情况下,我们应该选择这种方式。

订阅消息

首先,我们定义消息处理Handler。./subscriber/hello.go的代码如下:

package subscriber
import (
   "context"
   "github.com/micro/go-micro/util/log"
hello "hello/proto/hello"
)
type Hello struct{}
func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
   log.Log("Handler Received message: ", msg.Say)
   return nil
}
func Handler(ctx context.Context, msg *hello.Message) error {
   log.Log("Function Received message: ", msg.Say)
   return nil
}

只要签名是func(context.Context, v interface{}) error,函数的功能或方法都可以用于消息处理。 请注意,处理程序函数的第二个参数是*hello.Message,它在.proto文件中定义。Micro会自动解码消息,因此我们可以直接在消息处理程序中使用它。 准备消息处理程序后,您需要对其进行注册。./main.go的相关代码如下:

...
// Register Struct as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), new(subscriber.Hello))
// Register Function as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler)
...

上面的代码注册了两个消息处理处理程序,这些处理程序将接收来自名为“ com.foo.srv.hello ” 的主题的消息。 如果您想更好地控制订阅行为,则需要将其他参数传递给micro.RegisterSubscriber。首先让我们看一下该方法的签名:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error

第一个参数代表主题。第二个参数是server.Server,可以从中获取service.Server()。第三个参数是消息处理程序。 最后一个是一个可选参数,用于控制订阅的行为。其类型为server.SubscriberOption。目前,Micro提供了4种内置选项:

  1. server.DisableAutoAck(), 在处理完消息后,禁用对消息的自动确认。
  2. server.SubscriberContext(ctx context.Context),设置上下文选项以允许传递代理SubscriberOption。
  3. server.InternalSubscriber(b bool),指定不将订户发布到发现系统
  4. server.SubscriberQueue(n字符串),共享队列名在订阅者之间分发消息。

Micro默认情况下为每个订阅者实例创建一个全局唯一的队列。如果要与多个订阅者实例共享队列,则需要通过以下方式显式指定队列名称:

micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar"))

这样,订阅实例将共享相同的队列。因此,将消息分发到某个节点进行处理,避免重复处理同一消息。 考虑到在多个节点上运行多个服务实例是分布式系统中的常见情况,我的建议是:除非您知道自己在做什么,否则始终明确指定队列名称,即使当前只有一个预订实例。最常见的做法是使用主题为队列命名。

发布消息

创建一个发布消息的项目名称pub,其结构如下:

.
├── main.go
├── plugin.go
├── proto/hello
│   └── hello.proto
│   └── hello.pb.go
│   └── hello.pb.micro.go
├── go.mod
├── go.sum

除了main.go之外,其他文件的内容与上一篇文章中描述的相同,在此不再赘述。

这是main.go中的代码:

package main

import (
	"context"
	"log"
	"time"

	"github.com/micro/go-micro/v2"

	hello "hello/proto/hello"
)

func main() {
	// New Service
	service := micro.NewService(
		micro.Name("com.foo.srv.hello.pub"), // name the client service
	)
	// Initialise service
	service.Init()

	// create publisher
	pub := micro.NewEvent("com.foo.srv.hello", service.Client())

	// publish message every second
	for now := range time.Tick(time.Second) {
		if err := pub.Publish(context.TODO(), &hello.Message{Say: now.String()}); err != nil {
			log.Fatal("publish err", err)
		}
	}
}

与订阅功能类似,发布界面还支持选项,这些选项可用于控制发布行为。micro.Publisher接口的定义如下:

// Publisher is syntactic sugar for publishing
type Publisher interface {
   Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}

当前,Micro仅提供一种内置的发布选项:

client.WithExchange(e string)PublishOption,设置交换以路由消息通过。

运行

准备pub项目后,运行hello服务器和pub项目。 然后,我们将在hello服务器的控制台中看到接收到的消息的日志:

$ go run main.go plugin.go
2020-02-14 14:18:24.368336 I | Transport [http] Listening on [::]:56970
2020-02-14 14:18:24.368429 I | Broker [http] Connected to [::]:56971
2020-02-14 14:18:24.368680 I | Registry [mdns] Registering node: com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302
2020-02-14 14:18:24.370575 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:24.370784 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:40.415610 I | Handler Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:40.415651 I | Function Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:41.310969 I | Handler Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
2020-02-14 14:18:41.310999 I | Function Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
...

结论

Micro完全支持异步消息传递。它通过既支持高级发布/订阅模型,也支持低级别操作micro.Broker。 Pub / Sub大大简化了异步消息传递的开发,因此我们可以专注于业务逻辑而不是技术细节。 开发人员只需要定义发布者、订阅者和消息内容。所有其他工作均由框架完成。不再需要考虑异步消息传递系统中的常见问题,例如消息路由,重传和接收确认,并且也不必考虑消息的编码和解码。 当然,这种简化也带来一些限制。如果发布/订阅无法满足您的需求,可以使用micro.Broker。


欢迎转载,本文地址: https://blog.prodrich.com/detail/16/

带着使命来到世上的你,给他人提供价值,才有价值