Preamble
- Continuing the series, today is my sharing on how to re-implement the pubsub pattern using golang channel. Let’s go, guys!
Pubsub
- First we’ll get a full definition from the wiki :
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any , there may be . - Pubsub is a message pattern in which the publisher just sends the message and doesn’t care if any subscribers receive it and the messages will be sorted and sent regardless of whether there are any subscribers or not. Publishers and subscribers do not know each other’s existence. In some pubsub systems, there will be an additional component called a broker, which will be responsible for sorting and sending messages.
- Pubsub or message queue is generally used quite commonly in micro-service architectures. It provides a method for services to communicate with each other asynchronously. In addition, we will have some use cases for messaging queues in real-world scenarios such as: Sending emails, Data post-processing, Batch updates for databases…
Use case 3: Build pubsub service with buffered channel
- Based on the feature that the channel is a queue, I will have a simple demo about pubsub as follows:123456789101112131415type Message struct {topic stringcontent interface{}}type MessageChannel chan Messagefunc main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel}
messageQueue : contains a list of messages to be processed.MessageChannel : communication channel between publishers and subscribers. MessageChannel will now act as the heart of the system.
mapTopicMessage : contains a map between the topic and the list of message channels. A topic will be subscribed by many subscribers, so we will have a 1:N relationship. It acts as the management of topics and message channels.
- Principle operate:
messageQueue when receiving a new message, the service will filter out the list of MessageChannel corresponding to the topic of that message and send the new message to. Each subscriber will communicate with the MessageChannel to get the message.12345678910111213141516171819202122232425...func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel)go run(messageQueue, mapTopicMessage)}func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) {for {message := <-messageQueuelistMessageChannel, ok := mapTopicMessage[message.topic]if ok {for _, messageChannel := range listMessageChannel {messageChannel <- message}}}} - Publish message1234567891011121314151617181920212223242526func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel)go run(messageQueue, mapTopicMessage)// publishpublish(messageQueue, topic, "user-name is update to Hung")time.Sleep(time.Second * 10)}func publish(messageQueue chan Message, topic string, content string) {message := Message{topic: topic,content: content,}messageQueue <- messagefmt.Printf("%v: publish new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)}result:08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Register Subscription123456789101112131415161718192021func main() {...// subcribesub1 := registerSubscription(mapTopicMessage, topic)...}func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel {newMessageChannel := make(MessageChannel)value, ok := mapTopicMessage[topic]if ok {value = append(value, newMessageChannel)mapTopicMessage[topic] = value} else {mapTopicMessage[topic] = []MessageChannel{newMessageChannel}}return newMessageChannel}
when there is a “register subscription” request, the service will return a MessageChannel . Subscriber will communicate with that MessageChannel to receive the message. - Subscribe123456789func subcribe(messageChannel MessageChannel) {go func() {for {message := <-messageChannelfmt.Printf("%v: receive new message with topic: '%v' - content: '%v' n", time.Now().Format("15:04:05"), message.topic, message.content)}}()}
- Running and see what happen!12345678910111213141516171819202122func main() {maxMessage := 10000topic := "update-user"messageQueue := make(chan Message, maxMessage)mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannelgo run(messageQueue, mapTopicMessage)// register subcriptionssub1 := registerSubscription(mapTopicMessage, topic)// publishpublish(messageQueue, topic, "user-name is update to Hung")subcribe(sub1)time.Sleep(time.Second * 10)}result:09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Add more subscriber12345678910111213141516171819202122func main() {...// register subcriptionssub1 := registerSubscription(mapTopicMessage, topic)sub2 := registerSubscription(mapTopicMessage, topic)sub3 := registerSubscription(mapTopicMessage, topic)// publishpublish(messageQueue, topic, "user-name is update to Hung")subcribe(sub1)subcribe(sub2)subcribe(sub3)time.Sleep(time.Second * 10)}result:09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
Build a complete pubsub
- to be continued…
Ending
- to be continued…