これはGCPのCloud PubSubのチュートリアルをやってみただけの自分用のメモ。この記事で紹介されているサンプルコードはGitHubに置いてある。
Cloud PubSubとは
GCPで提供されているメッセージキュー。メッセージの送信をして(publish)、複数のシステムがそのメッセージを受信(subscribe)することができる。
登場する概念
- Message: PubSubを通じて送受信したいメッセージ
- Publisher: Messageを生成しTopicに送るもの
- Topic: Messageの送信先
- Subscription: Topicに対して紐付けられるMessageを受信するもの。1つのTopicに対して複数のSubscriptionを設定することができる
- Subscriber: Subscriptionと関連付けられた、Messageを受信するシステム
詳細はドキュメントに書いてある。
イメージ
使ってみる
APIを有効にする
gcloud services listで検索して
$ gcloud config set project oinume-pubsub-sample $ gcloud services list --available | grep pubsub pubsub.googleapis.com Cloud Pub/Sub API
enableで有効にする。
$ gcloud services enable pubsub.googleapis.com Operation "operations/acf.dd53c399-e6be-48c3-a087-ff01c982d0d8" finished successfully.
Topicの作成
first-topic
という名前のTopicを作る。
$ gcloud pubsub topics create first-topic Created topic [projects/oinume-pubsub-sample/topics/first-topic].
Subscriptionの作成
first-topic
へのSubscriptionを作る。SubscriptionはPull型とPush型の2つの種類があるが、今回はPush型を使っている。この2つについて詳しく知りたい場合はドキュメントを参照。--push-endpoint
の詳細については次で説明する。
$ gcloud pubsub subscriptions create first-topic-subscription \ —topic first-topic \ —push-endpoint="https://oinume-pubsub-sample.appspot.com/_ah/push-handlers/first-topic“
Google App Engineでpush型のsubscriberを作る
先ほど --push-endpoint
で指定したendpointをGAEで実装してみる。これはPubSubのmessageがpublishされたときにGCPから呼ばれるもの。GitHubにコードを上げてあるが、 "/_ah/push-handlers/first-topic
のURLのHandlerを定義して、標準出力に来たmessageを出力するだけのコードである。
main.go
package main import ( "encoding/json" "fmt" "log" "net/http" "os" ) func main() { http.HandleFunc("/", indexHandler) http.HandleFunc("/_ah/push-handlers/first-topic", pushHandler) port := os.Getenv("PORT") if port == "" { port = "8080" log.Printf("Defaulting to port %s\n", port) } log.Printf("Listening on port %s", port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil)) } func indexHandler(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } fmt.Fprint(w, "Hello, World!!") } func pushHandler(w http.ResponseWriter, r *http.Request) { type pushRequest struct { Message struct { Attributes map[string]string Data []byte ID string `json:"message_id"` } Subscription string } message := &pushRequest{} if err := json.NewDecoder(r.Body).Decode(message); err != nil { log.Printf("Could not decode body: %v\n", err) http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest) return } log.Printf("Data = %v\n", string(message.Message.Data)) fmt.Fprint(w, "ok") }
messageをpublishしてみる
以下のような感じでpublisherを実行すると first-topic
にmessageがpublishされる。 oinume-pubsub-sample
は自身のGCPプロジェクト名に置き換えること。
$ export GOOGLE_CLOUD_PROJECT=oinume-pubsub-sample $ export GO111MODULE=on $ go run cmd/publisher/publisher.go first-topic "Hello world"
実際にmessageのsubscribeが成功したかどうかはGCPのStackdriver Loggingで見ることができる。
疑問
- push-handlerがHTTPで200以外を返した場合、どのぐらいのタイミングで再送される?
- 再送する回数の上限は指定できる?

プログラマのためのGoogle Cloud Platform入門 サービスの全体像からクラウドネイティブアプリケーション構築まで
- 作者: 阿佐志保,中井悦司
- 出版社/メーカー: 翔泳社
- 発売日: 2017/06/02
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る