graphql-ruby で始める GraphQL Subscription

いままで GraphQL で Query と Mutation を使う機会はあったけど Subscription は使ったことがなかった。
ということで今回は graphql-ruby を使って Subscription を使ってみました。のメモ。

できたもの

Anket で回答した結果をブラウザから確認できるのですが、その画面をリロードしなくても結果が更新されるようにしてみました。

Anket

やっていることとしては画面を開いた時に subscription をリクエストして、以降は画面を閉じるまでは Slack でアンケートの回答があった時にその結果を受け取って画面上に反映しています。
クライアントには Apollo を使っています。

手元で試すことができるのでぜひ使ってみてください。
そして壊れてたら教えてください!!

GraphQL の Subscription とは?

Subscription は指定したクエリを購読して、サーバー変更があった時にその情報を受け取るための仕組みで、更新情報をリアルタイムで受け取るようなユースケースで使うことができます。
仕様はこちらに書かれています。
graphql.github.io

仕様に例が書かれていますが、例えば Slack のようなチャットアプリケーションの場合を想定します。

subscription NewMessages {
  newMessage(roomId: 123) {
    sender
    text
  }
}

このようなクエリを発行することで roomID が 123 のメッセージを購読して sender と text を他の人がメッセージを投稿した時に受け取ります。
受け取った情報を画面に表示していくことで、ユーザーとしては画面をリロードしなくても他の人が投稿した情報を確認することができるようになります。

さきほどの仕様はあくまでもこういった動きにしよう!というものなので実際にどう実現するかは各言語の実装によります。

graphql-ruby における subscription

graphql-ruby.org

graphql-ruby における subscription の実装は現時点で3つあります。

  1. Action Cable による実装
  2. Pusher による実装
  3. Ably による実装

この中で 2, 3 は GraphQL::Pro にバンドルされているものなので、無料でゴニョゴニョしようと思うと Action Cable を使うことになるのかなと思います。
そのため今回は Action Cable を使用しました。
Action Cable について詳しく知りたい場合は↓こちらが参考になると思います。
railsguides.jp

Action Cable を使って Subscription

ドキュメントを見るとあんまり書いてなくてAPI docs にサンプルあるからみてね!という感じ。
https://graphql-ruby.org/subscriptions/action_cable_implementation.html

まずは Action Cable の Connection を実装します。
Anket はログインされていることが前提となっているので、コネクションID は current_user に設定しました。

module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user
 
    def connect
      self.current_user = find_verified_user
    end
 
    private
      
    def find_verified_user
      user = User.find_by(id: cookies.encrypted[:user_id])
      return user if user.present?

      reject_unauthorized_connection
    end
  end
end

次に GraphQL Subscription のための Action Cable の Channel の実装をします。
サンプル との主な違いとしては Connection で指定した current_user を指定しているくらいでその他の大きな違いはありません。

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      current_user: current_user,
      channel: self,
    }

    result = AnketSchema.execute({
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    })

    payload = {
      result: result.to_h,
      more: result.subscription?,
    }

    if result.context[:subscription_id]
      @subscription_ids << result.context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each do |sid|
      AnketSchema.subscriptions.delete_subscription(sid)
    end
  end

  private

  def ensure_hash(ambiguous_param)
    case ambiguous_param
    when String
      if ambiguous_param.present?
        ensure_hash(JSON.parse(ambiguous_param))
      else
        {}
      end
    when Hash, ActionController::Parameters
      ambiguous_param
    when nil
      {}
    else
      raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
    end
  end
end

Action Cable 周りの実装はこれで終わりです。

Schema の定義

https://graphql-ruby.org/guides#subscriptions-guides
graphql-ruby で Subscription を動かすためには Base となるクラスを追加したりスキーマに対して Subscription を追加する必要があるのですがそれは↑のドキュメントに沿っていけばできるので、ここではスキーマ定義の部分だけ書きます。

graphql-ruby ではスキーマRuby のクラスを書いて定義します。
書き手としては GraphQL のスキーマのお作法を知らなくてもスキーマを定義していけるのは良いですね。

class Subscriptions::QuestionAnswered < Subscriptions::BaseSubscription
  argument :question_id, ID, required: true, loads: Types::QueryType

  field :answers, Types::AnswerType.connection_type, null: false

  def authorized?(question:)
     # context.current_user を使って該当のデータを参照できるユーザーかをチェックする
  end

  def subscribe(question:)
    {
      answers: question.answers
    }
  end

  def update(question:) {
    answers = Array(object)    
    {
      answers: answers 
    }
  end
end

ここで注目したいのは subscribeupdate メソッドです。
subscribe メソッドは subscription のクエリが発行された時に最初に呼ばれるメソッドです。
つまりここで返しているものが最初のレスポンスとなります。
update メソッドはその名の通りサーバー側で何か更新があった時に呼ばれるもので、その返り値がユーザーへ送られます。
この何か更新があった時を知らせるのが trigger です。

trigger

graphql-ruby.org

trigger を使うことで対象のデータに更新があったことを知らせることができます。
trigger は次のように使います。

AnketSchema.subscriptions.trigger('questionAnswered', { questionId: question_id }, answers, scope: question.team.id)

この trigger の第1引数は Schema に定義した subscriptoin の fieldに対応しています。
第2引数は field の引数、第三引数が実際に更新のあった object が入ります。
ここで注目したいのが第四引数で渡している scope です。
scope をすると更新を通知するクライアントを指定することができます。

動作を確認してみる

ここまでくると動作確認をすることができるのようになるのでまずは bundle exec rails s でサーバーを立ち上げます。
Action Cable は websocketで通信しているので、websocketが話せるクライアントを用意します。
今回はjs製の wscat を使って動作確認しました。
npm が入っていれば npm install -g wscat で手軽にインストールできます。

github.com

まずは接続をします。
うまくいくと welcome と返ってきます。

$ wscat -c wc://localhost:3000/cable
< {"type":"welcome"}

/cable は自分が指定したパスで任意のパスを config/route.rbmount ActionCable.server => '/cable' のように書くことで変更できます。
次に今回作成した GraphqlChannel を subscribe します。

> {"command":"subscribe","identifier":"{\"channel\":\"GraphqlChannel\"}"}
< {"identifier":"{\"channel\":\"GraphqlChannel\"}","type":"confirm_subscription"}

そして subscription を実行します。

> {"command":"message","identifier":"{\"channel\":\"GraphqlChannel\"}","data":"{\"action\":\"execute\",\"query\":\"subscription{ xxx }\"}"}

これで websocket 側の準備はできたので、bundle exec rails c をして AnketSchema.subscriptions.trigger('questionAnswered', { questionId: question_id }, Answer.first, scope: Question.first.tema.id) のように手動でトリガーしてみます。
うまく行けば次のようなレスポンスが返ってくるはずです

< {"identifier":"{\"channel\":\"GraphqlChannel\"}","message":{"result":{"data":{"questionAnswered": xxxxx },"more":true}}

ここまでくると後はクライアント側でその結果をハンドリングしていくだけです。
今回はここまでですが、Apollo を使っている場合はどうやって Apollo から graphql-ruby で実装された GraphQL server に Subscription を送るかが書かれたドキュメントがあるので、そちらが参考になります。
graphql-ruby.org

troubleshooting

trigger してもイベントを受け取ることができない

先ほどの wscat を使って動作確認をする例で bundle exec rails console をして trigger しましたが、自分がやった時は最初イベントを受け取ることができませんでした。
僕の場合、原因は使用していたサブスクリプションのアダプタがデフォルトの async だからでした。
async を使う場合は同一プロセス内でしか受け取れない?ようなので例えば Controller とかに trigger する実装をして検証すれば動作したかもしれませんが、 rails console のように別プロセスから行う場合は redispostgresql を使う必要があるようです。

今回は redis を選択したので dokcer を使ってを立ち上げます。

version: '3'
services:
  redis:
    image: redis
    ports:
      - "6379:6379"

後は config/cable.yaml を修正します。

development:
  adapter: redis
  url: redis://localhost:6379
  channel_prefix: app_dev

これでもサブスクライブできない時は publish している key と subscribe している key が異なる可能性があります。
これを確認する時 redis を使っている場合は redis-cli monitor を使うと捗ります。

docker-compose exec redis redis-cli monitor

monitor を使うと redis に対して何が実行されたかが出力されるので想定したタイミングでpublishされないや subscibe と publish が異なるキーだということに気付きやすくなります。

最後に

今回は graphql-ruby で subscription を使ってみました。
graphql-ruby としては大きくはまりどころはなく、むしろ Action Cable を使うのが初めてだったのでそちらの方がなれるのに時間がかかりました。