これはGCPのCloud PubSubのチュートリアルをやってみただけの自分用のメモ。この記事で紹介されているサンプルコードはGitHubに置いてある。
Cloud PubSubとは
GCPで提供されているメッセージキュー。メッセージの送信をして(publish)、複数のシステムがそのメッセージを受信(subscribe)することができる。
登場する概念
- Message: PubSubを通じて送受信したいメッセージ
- Publisher: Messageを生成しTopicに送るもの
- Topic: Messageの送信先
- Subscription: Topicに対して紐付けられるMessageを受信するもの。1つのTopicに対して複数のSubscriptionを設定することができる
- Subscriber: Subscriptionと関連付けられた、Messageを受信するシステム
詳細はドキュメントに書いてある。
イメージ
使ってみる
APIを有効にする
gcloud services listで検索して
$ gcloud config set project oinume-pubsub-sample
$ gcloud services list --available | grep pubsub
pubsub.googleapis.com Cloud Pub/Sub API
enableで有効にする。
$ gcloud services enable pubsub.googleapis.com
Operation "operations/acf.dd53c399-e6be-48c3-a087-ff01c982d0d8" finished successfully.
Topicの作成
first-topic
という名前のTopicを作る。
$ gcloud pubsub topics create first-topic
Created topic [projects/oinume-pubsub-sample/topics/first-topic].
Subscriptionの作成
first-topic
へのSubscriptionを作る。SubscriptionはPull型とPush型の2つの種類があるが、今回はPush型を使っている。この2つについて詳しく知りたい場合はドキュメントを参照。--push-endpoint
の詳細については次で説明する。
$ gcloud pubsub subscriptions create first-topic-subscription \
—topic first-topic \
—push-endpoint="https://oinume-pubsub-sample.appspot.com/_ah/push-handlers/first-topic“
Google App Engineでpush型のsubscriberを作る
先ほど --push-endpoint
で指定したendpointをGAEで実装してみる。これはPubSubのmessageがpublishされたときにGCPから呼ばれるもの。GitHubにコードを上げてあるが、 "/_ah/push-handlers/first-topic
のURLのHandlerを定義して、標準出力に来たmessageを出力するだけのコードである。
main.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
)
func main() {
http.HandleFunc("/", indexHandler)
http.HandleFunc("/_ah/push-handlers/first-topic", pushHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s\n", port)
}
log.Printf("Listening on port %s", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}
func indexHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
fmt.Fprint(w, "Hello, World!!")
}
func pushHandler(w http.ResponseWriter, r *http.Request) {
type pushRequest struct {
Message struct {
Attributes map[string]string
Data []byte
ID string `json:"message_id"`
}
Subscription string
}
message := &pushRequest{}
if err := json.NewDecoder(r.Body).Decode(message); err != nil {
log.Printf("Could not decode body: %v\n", err)
http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
return
}
log.Printf("Data = %v\n", string(message.Message.Data))
fmt.Fprint(w, "ok")
}
messageをpublishしてみる
以下のような感じでpublisherを実行すると first-topic
にmessageがpublishされる。 oinume-pubsub-sample
は自身のGCPプロジェクト名に置き換えること。
$ export GOOGLE_CLOUD_PROJECT=oinume-pubsub-sample
$ export GO111MODULE=on
$ go run cmd/publisher/publisher.go first-topic "Hello world"
実際にmessageのsubscribeが成功したかどうかはGCPのStackdriver Loggingで見ることができる。
疑問
- push-handlerがHTTPで200以外を返した場合、どのぐらいのタイミングで再送される?
- 再送する回数の上限は指定できる?