[[Mastodonを読む]] #contents *はじめに [#j10cefd2] 前回までで「/」にアクセスしたとき(ログイン済みとする)に初めに表示される画面の読解は終わりました。今回はトゥートをしたときに行われる処理について見ていきたいと思います。 *クライアントサイドの処理 [#o1c8247e] **app/assets/javascripts/components/features/compose/index.jsx [#xf112d87] PCの場合にしろ、スマホの場合にしろ、トゥートを行うためのコンポーネントはComposeクラスが使用されます。 &ref(compose.jpg); renderメソッドについては何度も見ているので省略します。 **app/assets/javascripts/components/features/compose/containers/compose_form_container.jsx [#ced5721b] コンポーネントとActionをつなぐ記述を確認します。ConposeFormContainerに書かれています。必要なところだけ抜き出すと、 #code{{ import { connect } from 'react-redux'; import ComposeForm from '../components/compose_form'; import { changeCompose, submitCompose, clearComposeSuggestions, fetchComposeSuggestions, selectComposeSuggestion, changeComposeSpoilerText, insertEmojiCompose } from '../../../actions/compose'; const mapDispatchToProps = (dispatch) => ({ onChange (text) { dispatch(changeCompose(text)); }, onSubmit () { dispatch(submitCompose()); }, 他のイベントハンドラ }); export default connect(mapStateToProps, mapDispatchToProps)(ComposeForm); }} ところで、jsxは全く知らないのですが、「export default」としておくとその右にあるものがConposeFormContainerとして使われるって認識でいいんですよね? **app/assets.javascripts/components/actions/compose.jsx [#m5f1b219] というわけで、submitCompose。ちょっと長めですがやってることはそこまで難しくありません。 #code{{ export function submitCompose() { return function (dispatch, getState) { const status = emojione.shortnameToUnicode(getState().getIn(['compose', 'text'], '')); if (!status || !status.length) { return; } dispatch(submitComposeRequest()); api(getState).post('/api/v1/statuses', { status, in_reply_to_id: getState().getIn(['compose', 'in_reply_to'], null), media_ids: getState().getIn(['compose', 'media_attachments']).map(item => item.get('id')), sensitive: getState().getIn(['compose', 'sensitive']), spoiler_text: getState().getIn(['compose', 'spoiler_text'], ''), visibility: getState().getIn(['compose', 'privacy']) }, { headers: { 'Idempotency-Key': getState().getIn(['compose', 'idempotencyKey']) } }).then(function (response) { dispatch(submitComposeSuccess({ ...response.data })); // To make the app more responsive, immediately get the status into the columns dispatch(updateTimeline('home', { ...response.data })); if (response.data.in_reply_to_id === null && response.data.visibility === 'public') { if (getState().getIn(['timelines', 'community', 'loaded'])) { dispatch(updateTimeline('community', { ...response.data })); } if (getState().getIn(['timelines', 'public', 'loaded'])) { dispatch(updateTimeline('public', { ...response.data })); } } }).catch(function (error) { dispatch(submitComposeFail(error)); }); }; }; }} 「/api/v1/statuses」に入力をPOSTして、成功したら各タイムラインを更新です。更新ActionおよびReducerでの処理も淡々と処理が行われているだけなので省略します。 *サーバサイドの処理 [#y45db5e6] ここまででクライアントサイドでどういう処理が行われているか見てきたので、次はサーバサイドを見ていきます。 **app/controllers/api/v1/statuses_controller.rb [#dff20b9a] 「/api/v1/statuses」にアクセスされたときに呼び出されるのは、StatusesControllerのcreateメソッド、 #code(Ruby){{ def create @status = PostStatusService.new.call(current_user.account, status_params[:status], status_params[:in_reply_to_id].blank? ? nil : Status.find(status_params[:in_reply_to_id]), media_ids: status_params[:media_ids], sensitive: status_params[:sensitive], spoiler_text: status_params[:spoiler_text], visibility: status_params[:visibility], application: doorkeeper_token.application, idempotency: request.headers['Idempotency-Key']) render :show end }} Serviceって使ったことないけど昔からあるんですかね?ともかく、PostStatusServiceに制御が移ります。 **app/services/post_status_service.rb [#e33fe559] #code(Ruby){{ class PostStatusService < BaseService # Post a text status update, fetch and notify remote users mentioned # @param [Account] account Account from which to post # @param [String] text Message # @param [Status] in_reply_to Optional status to reply to # @param [Hash] options # @option [Boolean] :sensitive # @option [String] :visibility # @option [String] :spoiler_text # @option [Enumerable] :media_ids Optional array of media IDs to attach # @option [Doorkeeper::Application] :application # @option [String] :idempotency Optional idempotency key # @return [Status] def call(account, text, in_reply_to = nil, options = {}) if options[:idempotency].present? existing_id = redis.get("idempotency:status:#{account.id}:#{options[:idempotency]}") return Status.find(existing_id) if existing_id end media = validate_media!(options[:media_ids]) status = nil ApplicationRecord.transaction do status = account.statuses.create!(text: text, thread: in_reply_to, sensitive: options[:sensitive], spoiler_text: options[:spoiler_text] || '', visibility: options[:visibility], language: detect_language_for(text, account), application: options[:application]) attach_media(status, media) end process_mentions_service.call(status) process_hashtags_service.call(status) LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present? DistributionWorker.perform_async(status.id) Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) if options[:idempotency].present? redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id) end status end }} 送信されたトゥート(Status)の保存自体は普通のRailsでのモデル新規作成です。違いがあるのはprocess_*_service.callと*Worker.perform_asyncです。 **app/services/process_mentions_service.rb [#c3447622] まず、process_mentions_serviceです。その名の通り、メンション(リプ)先のアカウントに通知を行っています。ただし、Mastodonでは別インスタンスにいるユーザ(リモートユーザ)宛のメンションもできるのでその部分で少し処理分岐が発生しています。 #code(Ruby){{ class ProcessMentionsService < BaseService include StreamEntryRenderer # Scan status for mentions and fetch remote mentioned users, create # local mention pointers, send Salmon notifications to mentioned # remote users # @param [Status] status def call(status) return unless status.local? status.text.scan(Account::MENTION_RE).each do |match| username, domain = match.first.split('@') mentioned_account = Account.find_remote(username, domain) if mentioned_account.nil? && !domain.nil? begin mentioned_account = follow_remote_account_service.call(match.first.to_s) rescue Goldfinger::Error, HTTP::Error mentioned_account = nil end end next if mentioned_account.nil? mentioned_account.mentions.where(status: status).first_or_create(status: status) end status.mentions.includes(:account).each do |mention| mentioned_account = mention.account if mentioned_account.local? NotifyService.new.call(mentioned_account, mention) else NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id) end end end }} リモートユーザの場合はリモートユーザがいるインスタンスにデータを投げて(Worker経由でService実行)、受けて処理する(Controller→Worker→Service)ということが行われています。全部追いかけていくと長くなるので省略します。 **app/workers/distribution_worker.rb [#k3add90a] 次にいくつかWorkerを実行しています。このWorkerは[[Sidekiq>http://sidekiq.org/]]というgemを利用しており、処理はバックグラウンドで行われるようです。 DistributionWorkerでは投稿したトゥートをフォロワーなどに配信する処理を行っているようです。 #code(Ruby){{ class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) FanOutOnWriteService.new.call(Status.find(status_id)) rescue ActiveRecord::RecordNotFound info("Couldn't find the status") end end }} app/services/fan_out_on_write_service.rb #code(Ruby){{ class FanOutOnWriteService < BaseService # Push a status into home and mentions feeds # @param [Status] status def call(status) raise Mastodon::RaceConditionError if status.visibility.nil? deliver_to_self(status) if status.account.local? if status.direct_visibility? deliver_to_mentioned_followers(status) else deliver_to_followers(status) end return if status.account.silenced? || !status.public_visibility? || status.reblog? render_anonymous_payload(status) deliver_to_hashtags(status) return if status.reply? && status.in_reply_to_account_id != status.account_id deliver_to_public(status) end }} **app/workers/pubsubhubbub/distribution_worker.rb [#v654e1a5] 最後にPubsubhubbubの方のDistributionWorkerです。 #code(Ruby){{ class Pubsubhubbub::DistributionWorker include Sidekiq::Worker sidekiq_options queue: 'push' def perform(stream_entry_id) stream_entry = StreamEntry.find(stream_entry_id) return if stream_entry.status&.direct_visibility? account = stream_entry.account payload = AtomSerializer.render(AtomSerializer.new.feed(account, [stream_entry])) domains = account.followers_domains Subscription.where(account: account).active.select('id, callback_url').find_each do |subscription| next unless domains.include?(Addressable::URI.parse(subscription.callback_url).host) Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload) end rescue ActiveRecord::RecordNotFound true end end }} これにより連合を組んでいるインスタンスにトゥートが送信される、と思っているのですが、accountってなんだろう。連合組んでるインスタンスにはトゥートがあったら全部飛んでくわけじゃない?もう少し確認が必要に思いますが今回は保留。 *おわりに [#y42a185d] 今回は新規トゥート送信時のクライアントサイド、サーバサイドの処理を見てきました。クライアントサイドについては今まで見てきたのと同じようなReact, Reduxの処理でした。 一方、サーバサイドではServiceを使用し処理をモジュール化、また、Mastodonの特徴である別インスタンスへの通知を行うためにSidekiqを利用したWorkerが使用されていました。これについては雰囲気こんな感じという読み方しかしていないので誤読があるかもしれません。特に、別インスタンスに新規トゥートを送ってるところは思ってたのと違う(思ってたことが間違ってる可能性もある)ので連合の仕様をちゃんと確認する必要がありそうです。