Mục Đích
- Call api tới AWS Simple Queue Service(SQS) thông qua sdk
Code Mẫu
Chuẩn bị
- cài đặt sdk aws, run:
1 2 | go get -u github.com/aws/aws-sdk-go/ |
- cấu hình access-key and secret-key.
Tạo một session
- tạo một session để tái sử dụng trong những function khác
- profile được dùng là default, để biết profile đang được cấu hình, run trên linux/mac
1 2 | cat ~/.aws/credentials |
- function để tạo 1 session
1 2 3 4 5 6 7 8 9 10 11 12 13 | func GetSession() *session.Session { sess, err := session.NewSessionWithOptions(session.Options{ Profile: "default", Config: aws.Config{ Region: aws.String("us-west-1"), }, }) if err != nil { panic(err) } return sess } |
Cách tạo ra 1 queue
- sử dụng function
CreateQueue
trongaws-sdk
để tạo 1 queue cho việc test. - có một số parameter quan trọng cần lưu ý:
queueName
: name của queue mà bạn muốn tạoDelaySeconds
: thời gian message bạn muốn giữ lại trước khi gửi đi,VisibilityTimeout
: thời gian một message trước khi expire.
- function create 1 queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | func CreateQueue(sess *session.Session, queueName string) (*sqs.CreateQueueOutput, error) { sqsClient := sqs.New(sess) result, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{ QueueName: &queueName, Attributes: map[string]*string{ "DelaySeconds": aws.String("0"), "VisibilityTimeout": aws.String("60"), }, }) if err != nil { return nil, err } return result, nil } |
Get URL của 1 queue
- Tất cả các apis điều yêu cầu url của queue, vì thế sử dụng
GetQueueURL
để truy vấn url của queue-name - function get url
1 2 3 4 5 6 7 8 9 10 11 | func GetQueueURL(sess *session.Session, queue string) (*sqs.GetQueueUrlOutput, error) { sqsClient := sqs.New(sess) result, err := sqsClient.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: &queue, }) if err != nil { return nil, err } return result, nil } |
Gửi message đến queue
- chúng ta sẽ sử dụng hàm
SendMessage
từaws-sdk
để gửi message đến queue - một vài paramters quan trọng cần chú ý trong lúc gửi message:
QueueUrl
: url của queue muốn gửi message.- MessageBody: body được gửi đến queue, có thể là json-string hoặc string
- function gửi message
1 2 3 4 5 6 7 8 9 10 11 12 | func SendMessage(sess *session.Session, queueUrl string, messageBody string) (*sqs.SendMessageOutput, error) { sqsClient := sqs.New(sess) sendOut, err := sqsClient.SendMessage(&sqs.SendMessageInput{ QueueUrl: &queueUrl, MessageBody: aws.String(messageBody), }) if err != nil { return nil, err } return sendOut, nil } |
Nhận message từ queue
- chúng ta sẽ sử dụng hàm
ReceiveMessage
từaws-sdk
để lấy message từ queue - một vài paramters quan trọng cần được chú ý trong lúc nhận message:
QueueUrl
: url của queue muốn nhận message.MaxNumberOfMessages
: tổng số message có thể nhận được.
- function receive messsage:
1 2 3 4 5 6 7 8 9 10 11 12 | func GetMessages(sess *session.Session, queueUrl string, maxMessages int) (*sqs.ReceiveMessageOutput, error) { sqsClient := sqs.New(sess) msgResult, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: &queueUrl, MaxNumberOfMessages: aws.Int64(int64(maxMessages)), }) if err != nil { return nil, err } return msgResult, nil } |
Xoá message trong queue
- khi nhận message từ queue, message sẽ không tự động xoá ra khỏi queue.
- consumer khác có thể nhận message sau thời gian
VisibilityTimeout
hết hạn. - để đảm bảo rằng không bị trùng message thì cần phải xoá.
- chúng ta sẽ sử dụng hàm
DeleteMessage
từaws-sdk
để xoá message từ queue - cần cung cấp
ReceiptHandle
trong thành phần của method - function xoá message:
1 2 3 4 5 6 7 8 9 | func DeleteMessage(sess *session.Session, queueUrl string, messageHandle *string) error { sqsClient := sqs.New(sess) _, err := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: &queueUrl, ReceiptHandle: messageHandle, }) return err } |
Xoá tất cả message trong queue
- chúng ta sẽ sử dụng hàm
PurgeQueue
từaws-sdk
để xoá tất cả message trong queue - một vài paramters quan trọng cần được chú ý trong lúc xoá tất cả message:
- function xoá tất cả message:
1 2 3 4 5 6 7 8 | func PurgeQueue(sess *session.Session, queueUrl string) error { sqsClient := sqs.New(sess) _, err := sqsClient.PurgeQueue(&sqs.PurgeQueueInput{ QueueUrl: aws.String(queueUrl), }) return err } |