[[Mastodonを読む]] #contents *はじめに [#ybf3408f] home#indexの処理を見た際など、いくつかのコード中にストリーミングに関する処理がありました。今回はそれらについて見ていき、Mastodonでどのようにストリーミング処理が行われているのかを確認しましょう。 *復習:app/controllers/home_controller.rb [#p8a42340] home#indexを見た際、ストリーミングに利用するURLが以下のように設定されていました。 #code(Ruby){{ def index @body_classes = 'app-body' @token = find_or_create_access_token.token @web_settings = Web::Setting.find_by(user: current_user)&.data || {} @admin = Account.find_local(Setting.site_contact_username) @streaming_api_base_url = Rails.configuration.x.streaming_api_base_url end }} Rails.configuration.xってなんだろ。アプリ独自の設定書くときに使っていいのかな。 indexのレンダリングが行われ、クライアントにはinitial-stateとしてストリーミングAPIのURLが渡されることになります。なお、開発環境だと以下のようになります。 #code{{ { "meta":{ "streaming_api_base_url":"http://localhost:4000", "access_token":"8920c1d8279002fa0f6b74f96bf7c971f997318755a35f5a26b511229a2786b7", }} ここで重要なのは、ストリーミングで使われるサーバがMastodon本体とは別、ということです。(Mastodon本体は開発環境の場合、http://localhost:3000になります) *クライアント側:app/assets/javascripts/components/containers/mastdon.jsx [#e6c5a731] サーバが別ということが確認できましたが、まずはサーバの前にクライアントの確認をしましょう。クライアント処理はReactのMastodonクラスに書かれています。(以下はホームタイムラインですが、ローカルタイムライン、連合タイムラインも処理はほぼ同じです) #code{{ class Mastodon extends React.Component { componentDidMount() { const { locale } = this.props; const streamingAPIBaseURL = store.getState().getIn(['meta', 'streaming_api_base_url']); const accessToken = store.getState().getIn(['meta', 'access_token']); this.subscription = createStream(streamingAPIBaseURL, accessToken, 'user', { connected () { store.dispatch(connectTimeline('home')); }, disconnected () { store.dispatch(disconnectTimeline('home')); }, received (data) { switch(data.event) { case 'update': store.dispatch(updateTimeline('home', JSON.parse(data.payload))); break; case 'delete': store.dispatch(deleteFromTimelines(data.payload)); break; case 'notification': store.dispatch(updateNotifications(JSON.parse(data.payload), getMessagesForLocale(locale), locale)); break; } }, reconnected () { store.dispatch(connectTimeline('home')); store.dispatch(refreshTimeline('home')); store.dispatch(refreshNotifications()); } }); 省略 } }} createStreamで接続、引数で渡しているハンドラオブジェクト?っていうのかな?のreceivedでサーバからデータを受信したときにタイムラインの更新を行っている雰囲気です。 **app/assets/javascripts/components/stream.jsx [#p927c34c] createStreamを確認しましょう。 #code{{ export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { const ws = new WebSocketClient(`${createWebSocketURL(streamingAPIBaseURL)}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`); ws.onopen = connected; ws.onmessage = e => received(JSON.parse(e.data)); ws.onclose = disconnected; ws.onreconnect = reconnected; return ws; }; }} というわけで、「/api/v1/streaming/」というURLにアクセスが行われるようです。 *サーバ側:streaming/index.js [#qe46045f] ここからサーバ側。「/api/v1/streaming/」で探してみると、streamingディレクトリにindex.jsがあるのに気づきます。ストリーミング処理はJavascript(Node)で書かれているようです。 **クライアントからの接続受け付け [#i22b0c10] 最後の方から見ると以下の記述があります。 #code(javascript){{ wss.on('connection', ws => { const location = url.parse(ws.upgradeReq.url, true) const token = location.query.access_token const req = { requestId: uuid.v4() } accountFromToken(token, req, err => { if (err) { log.error(req.requestId, err) ws.close() return } switch(location.query.stream) { case 'user': streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws)) break; 省略 } }) }) }} accountFromTokenは名前の通り、アクセストークンからアカウントのIDを取得しています。 **streamFrom [#e9f4ef39] では。streamFromです。 #code(javascript){{ const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) const listener = message => { const { event, payload, queued_at } = JSON.parse(message) const transmit = () => { const now = new Date().getTime() const delta = now - queued_at; log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) output(event, payload) } // Only messages that may require filtering are statuses, since notifications // are already personalized and deletes do not matter if (needsFiltering && event === 'update') { 省略 } else { transmit() } } subscribe(id, listener) attachCloseHandler(id, listener) } }} listenerを定義しています。listenerではmessageが送られてきたらそれをoutputオブジェクトに書き込むということをしています。これにより、接続先のクライアントに情報が送られるということになります。 **subscribe [#c7aa549c] というわけで今度はsubscribe。 #code(javascript){{ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`) subs[channel] = subs[channel] || [] subs[channel].push(callback) } }} よくあるイベントリスナー的なコードです。ではこのsubsをどこで使っているのかというと、すぐ上の部分、 #code(javascript){{ const redisClient = redis.createClient({ host: process.env.REDIS_HOST || '127.0.0.1', port: process.env.REDIS_PORT || 6379, password: process.env.REDIS_PASSWORD }) const subs = {} redisClient.on('pmessage', (_, channel, message) => { const callbacks = subs[channel] log.silly(`New message on channel ${channel}`) if (!callbacks) { return } callbacks.forEach(callback => callback(message)) }) redisClient.psubscribe('timeline:*') }} Redisに接続し、Redisからメッセージが飛んで来たら対応するcallback(listener)が呼び出されるようになっています。つまり、Redisへの書き込みが行われることでストリーミングサーバが反応→クライアントにプッシュが行われるようです。なお、psubscribeはpattern subscribeの略のようです。 *Redisへの書き込み:app/services/post_status_service.rb [#y51fdace] というわけで後はRedisに書き込んでいる場所を探せばいいことになります。実はすでに一回出てきていました。ここからRallsになります。 #code(Ruby){{ class PostStatusService < BaseService def call(account, text, in_reply_to = nil, options = {}) media = validate_media!(options[:media_ids]) status = nil ApplicationRecord.transaction do status = account.statuses.create!(text: text, thread: in_reply_to, sensitive: options[:sensitive], spoiler_text: options[:spoiler_text] || '', visibility: options[:visibility], language: detect_language_for(text, account), application: options[:application]) attach_media(status, media) end process_mentions_service.call(status) process_hashtags_service.call(status) LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present? DistributionWorker.perform_async(status.id) Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) status end }} いろいろなWorkerが呼ばれていますが、DistributionWorkerを見てみる。 app/works/distribution_worker.rb #code(Ruby){{ class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) FanOutOnWriteService.new.call(Status.find(status_id)) rescue ActiveRecord::RecordNotFound info("Couldn't find the status") end end }} さらにFanOutOnWriteService。 app/services/fan_out_on_write_service.rb #code(Ruby){{ class FanOutOnWriteService < BaseService # Push a status into home and mentions feeds # @param [Status] status def call(status) raise Mastodon::RaceConditionError if status.visibility.nil? deliver_to_self(status) if status.account.local? if status.direct_visibility? deliver_to_mentioned_followers(status) else deliver_to_followers(status) end return if status.account.silenced? || !status.public_visibility? || status.reblog? render_anonymous_payload(status) deliver_to_hashtags(status) return if status.reply? && status.in_reply_to_account_id != status.account_id deliver_to_public(status) end def deliver_to_self(status) Rails.logger.debug "Delivering status #{status.id} to author" FeedManager.instance.push(:home, status.account, status) end }} **app/lib/feed_manager.rb [#u274292c] FeedManagerのpushメソッド。 #code(Ruby){{ def push(timeline_type, account, status) timeline_key = key(timeline_type, account.id) if status.reblog? # If the original status is within 40 statuses from top, do not re-insert it into the feed rank = redis.zrevrank(timeline_key, status.reblog_of_id) return if !rank.nil? && rank < 40 redis.zadd(timeline_key, status.id, status.reblog_of_id) else redis.zadd(timeline_key, status.id, status.id) trim(timeline_type, account.id) end PushUpdateWorker.perform_async(account.id, status.id) end }} なかなかしぶとい(笑)。さらにPushUpdateWorkerです。 #code(Ruby){{ class PushUpdateWorker include Sidekiq::Worker def perform(account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) message = InlineRenderer.render(status, account, 'api/v1/statuses/show') Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) rescue ActiveRecord::RecordNotFound true end end }} 「timeline:アカウントID」への書き込みがありました。これで、Redisに書き込みが行われ、最終的にクライアントに届くということになります。フォロワーにトゥートが届く仕組みもほぼ同じです。リモートインスタンスにいるフォロワーの場合は仕組みがかなり複雑になっていますが、ひとつずつ確認していけば理解できると思います。 *おわりに [#te19abd0] 今回はストリーミングの処理を見てきました。ストリーミングは次の3つから構成されています。 -クライアント側の接続、サーバから送られてきたら表示処理 -サーバ側その1。クライアントへデータを送る部分(Node) -サーバ側その2。送るデータの書き込み部分(Rails) サーバ内での情報の受け渡しにはRedisが使われていました。 ここまでで、初期画面表示、トゥート作成、フォロー、リモートフォロー、ストリーミングと一通りの処理を見てきたことになります。実はそろそろ1.4が出そうですが、これにて「Mastodonを読む」完結としたいと思います。