Mở đầu
- Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let’s go, guys!
Pubsub
- Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. - Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó sẽ được đảm nhiệm phân loại và gửi message.
- Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases …
Use case 3: Build pubsub service with buffered channel
- Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:123456789101112131415type Message struct {topic stringcontent interface{}}type MessageChannel chan Messagefunc main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel}
messageQueue: chứa danh sách các message sẽ xủ lí.MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.
mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.
- Nguyên lý vận hành:
messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.12345678910111213141516171819202122232425...func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel)go run(messageQueue, mapTopicMessage)}func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) {for {message := <-messageQueuelistMessageChannel, ok := mapTopicMessage[message.topic]if ok {for _, messageChannel := range listMessageChannel {messageChannel <- message}}}} - Publish message1234567891011121314151617181920212223242526func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel)go run(messageQueue, mapTopicMessage)// publishpublish(messageQueue, topic, "user-name is update to Hung")time.Sleep(time.Second * 10)}func publish(messageQueue chan Message, topic string, content string) {message := Message{topic: topic,content: content,}messageQueue <- messagefmt.Printf("%v: publish new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)}result:08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Register Subscription123456789101112131415161718192021func main() {...// subcribesub1 := registerSubscription(mapTopicMessage, topic)...}func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel {newMessageChannel := make(MessageChannel)value, ok := mapTopicMessage[topic]if ok {value = append(value, newMessageChannel)mapTopicMessage[topic] = value} else {mapTopicMessage[topic] = []MessageChannel{newMessageChannel}}return newMessageChannel}
khi có một “register subcription” request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message. - Subcribe123456789func subcribe(messageChannel MessageChannel) {go func() {for {message := <-messageChannelfmt.Printf("%v: receive new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)}}()}
- Running and see what happen!12345678910111213141516171819202122func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannelgo run(messageQueue, mapTopicMessage)// register subcriptionssub1 := registerSubscription(mapTopicMessage, topic)// publishpublish(messageQueue, topic, "user-name is update to Hung")subcribe(sub1)time.Sleep(time.Second * 10)}result:09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Add more subcriber12345678910111213141516171819202122func main() {...// register subcriptionssub1 := registerSubscription(mapTopicMessage, topic)sub2 := registerSubscription(mapTopicMessage, topic)sub3 := registerSubscription(mapTopicMessage, topic)// publishpublish(messageQueue, topic, "user-name is update to Hung")subcribe(sub1)subcribe(sub2)subcribe(sub3)time.Sleep(time.Second * 10)}result:09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
Xây dựng một pubsub hoàn chỉnh
- to be continued …
Tạm kết
- to be continued …