oinume journal

Scratchpad of what I learned

GCPのCloud PubSubをGoで動かしてみただけのメモ

これはGCPのCloud PubSubのチュートリアルをやってみただけの自分用のメモ。この記事で紹介されているサンプルコードはGitHubに置いてある。

Cloud PubSubとは

GCPで提供されているメッセージキュー。メッセージの送信をして(publish)、複数のシステムがそのメッセージを受信(subscribe)することができる。

登場する概念

  • Message: PubSubを通じて送受信したいメッセージ
  • Publisher: Messageを生成しTopicに送るもの
  • Topic: Messageの送信先
  • Subscription: Topicに対して紐付けられるMessageを受信するもの。1つのTopicに対して複数のSubscriptionを設定することができる
  • Subscriber: Subscriptionと関連付けられた、Messageを受信するシステム

詳細はドキュメントに書いてある。

イメージ

使ってみる

APIを有効にする

gcloud services listで検索して

$ gcloud config set project oinume-pubsub-sample
$ gcloud services list --available | grep pubsub
pubsub.googleapis.com Cloud Pub/Sub API

enableで有効にする。

$ gcloud services enable pubsub.googleapis.com
Operation "operations/acf.dd53c399-e6be-48c3-a087-ff01c982d0d8" finished successfully.

Topicの作成

first-topic という名前のTopicを作る。

$ gcloud pubsub topics create first-topic
Created topic [projects/oinume-pubsub-sample/topics/first-topic].

Subscriptionの作成

first-topic へのSubscriptionを作る。SubscriptionはPull型とPush型の2つの種類があるが、今回はPush型を使っている。この2つについて詳しく知りたい場合はドキュメントを参照。--push-endpoint の詳細については次で説明する。

$ gcloud pubsub subscriptions create first-topic-subscription \
—topic first-topic \
—push-endpoint="https://oinume-pubsub-sample.appspot.com/_ah/push-handlers/first-topic“

Google App Engineでpush型のsubscriberを作る

先ほど --push-endpoint で指定したendpointをGAEで実装してみる。これはPubSubのmessageがpublishされたときにGCPから呼ばれるもの。GitHubにコードを上げてあるが、 "/_ah/push-handlers/first-topic のURLのHandlerを定義して、標準出力に来たmessageを出力するだけのコードである。

main.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
)

func main() {
    http.HandleFunc("/", indexHandler)
    http.HandleFunc("/_ah/push-handlers/first-topic", pushHandler)

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
        log.Printf("Defaulting to port %s\n", port)
    }

    log.Printf("Listening on port %s", port)
    log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}

func indexHandler(w http.ResponseWriter, r *http.Request) {
    if r.URL.Path != "/" {
        http.NotFound(w, r)
        return
    }
    fmt.Fprint(w, "Hello, World!!")
}

func pushHandler(w http.ResponseWriter, r *http.Request) {
    type pushRequest struct {
        Message struct {
            Attributes map[string]string
            Data       []byte
            ID         string `json:"message_id"`
        }
        Subscription string
    }

    message := &pushRequest{}
    if err := json.NewDecoder(r.Body).Decode(message); err != nil {
        log.Printf("Could not decode body: %v\n", err)
        http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
        return
    }

    log.Printf("Data = %v\n", string(message.Message.Data))
    fmt.Fprint(w, "ok")
}

messageをpublishしてみる

以下のような感じでpublisherを実行すると first-topic にmessageがpublishされる。 oinume-pubsub-sample は自身のGCPプロジェクト名に置き換えること。

$ export GOOGLE_CLOUD_PROJECT=oinume-pubsub-sample
$ export GO111MODULE=on
$ go run cmd/publisher/publisher.go first-topic "Hello world"

実際にmessageのsubscribeが成功したかどうかはGCPのStackdriver Loggingで見ることができる。

疑問

  • push-handlerがHTTPで200以外を返した場合、どのぐらいのタイミングで再送される?
  • 再送する回数の上限は指定できる?

プログラマのためのGoogle Cloud Platform入門 サービスの全体像からクラウドネイティブアプリケーション構築まで

プログラマのためのGoogle Cloud Platform入門 サービスの全体像からクラウドネイティブアプリケーション構築まで