Mastodonを読む/ストリーミング処理
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
]
開始行:
[[Mastodonを読む]]
#contents
*はじめに [#ybf3408f]
home#indexの処理を見た際など、いくつかのコード中にストリ...
*復習:app/controllers/home_controller.rb [#p8a42340]
home#indexを見た際、ストリーミングに利用するURLが以下のよ...
#code(Ruby){{
def index
@body_classes = 'app-body'
@token = find_or_create_access_token...
@web_settings = Web::Setting.find_by(user: ...
@admin = Account.find_local(Setting....
@streaming_api_base_url = Rails.configuration.x.strea...
end
}}
Rails.configuration.xってなんだろ。アプリ独自の設定書くと...
indexのレンダリングが行われ、クライアントにはinitial-stat...
#code{{
{
"meta":{
"streaming_api_base_url":"http://localhost:4000",
"access_token":"8920c1d8279002fa0f6b74f96bf7c971f9973...
}}
ここで重要なのは、ストリーミングで使われるサーバがMastodo...
*クライアント側:app/assets/javascripts/components/contai...
サーバが別ということが確認できましたが、まずはサーバの前...
#code{{
class Mastodon extends React.Component {
componentDidMount() {
const { locale } = this.props;
const streamingAPIBaseURL = store.getState().getIn(['...
const accessToken = store.getState().getIn(['meta', '...
this.subscription = createStream(streamingAPIBaseURL,...
connected () {
store.dispatch(connectTimeline('home'));
},
disconnected () {
store.dispatch(disconnectTimeline('home'));
},
received (data) {
switch(data.event) {
case 'update':
store.dispatch(updateTimeline('home', JSON.pars...
break;
case 'delete':
store.dispatch(deleteFromTimelines(data.payload...
break;
case 'notification':
store.dispatch(updateNotifications(JSON.parse(d...
break;
}
},
reconnected () {
store.dispatch(connectTimeline('home'));
store.dispatch(refreshTimeline('home'));
store.dispatch(refreshNotifications());
}
});
省略
}
}}
createStreamで接続、引数で渡しているハンドラオブジェクト...
**app/assets/javascripts/components/stream.jsx [#p927c34c]
createStreamを確認しましょう。
#code{{
export default function getStream(streamingAPIBaseURL, ac...
const ws = new WebSocketClient(`${createWebSocketURL(st...
ws.onopen = connected;
ws.onmessage = e => received(JSON.parse(e.data));
ws.onclose = disconnected;
ws.onreconnect = reconnected;
return ws;
};
}}
というわけで、「/api/v1/streaming/」というURLにアクセスが...
*サーバ側:streaming/index.js [#qe46045f]
ここからサーバ側。「/api/v1/streaming/」で探してみると、s...
**クライアントからの接続受け付け [#i22b0c10]
最後の方から見ると以下の記述があります。
#code(javascript){{
wss.on('connection', ws => {
const location = url.parse(ws.upgradeReq.url, true)
const token = location.query.access_token
const req = { requestId: uuid.v4() }
accountFromToken(token, req, err => {
if (err) {
log.error(req.requestId, err)
ws.close()
return
}
switch(location.query.stream) {
case 'user':
streamFrom(`timeline:${req.accountId}`, req, stre...
break;
省略
}
})
})
}}
accountFromTokenは名前の通り、アクセストークンからアカウ...
**streamFrom [#e9f4ef39]
では。streamFromです。
#code(javascript){{
const streamFrom = (id, req, output, attachCloseHandler...
log.verbose(req.requestId, `Starting stream from ${id...
const listener = message => {
const { event, payload, queued_at } = JSON.parse(me...
const transmit = () => {
const now = new Date().getTime()
const delta = now - queued_at;
log.silly(req.requestId, `Transmitting for ${req....
output(event, payload)
}
// Only messages that may require filtering are sta...
// are already personalized and deletes do not matter
if (needsFiltering && event === 'update') {
省略
} else {
transmit()
}
}
subscribe(id, listener)
attachCloseHandler(id, listener)
}
}}
listenerを定義しています。listenerではmessageが送られてき...
**subscribe [#c7aa549c]
というわけで今度はsubscribe。
#code(javascript){{
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`)
subs[channel] = subs[channel] || []
subs[channel].push(callback)
}
}}
よくあるイベントリスナー的なコードです。ではこのsubsをど...
#code(javascript){{
const redisClient = redis.createClient({
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
})
const subs = {}
redisClient.on('pmessage', (_, channel, message) => {
const callbacks = subs[channel]
log.silly(`New message on channel ${channel}`)
if (!callbacks) {
return
}
callbacks.forEach(callback => callback(message))
})
redisClient.psubscribe('timeline:*')
}}
Redisに接続し、Redisからメッセージが飛んで来たら対応するc...
*Redisへの書き込み:app/services/post_status_service.rb [...
というわけで後はRedisに書き込んでいる場所を探せばいいこと...
#code(Ruby){{
class PostStatusService < BaseService
def call(account, text, in_reply_to = nil, options = {})
media = validate_media!(options[:media_ids])
status = nil
ApplicationRecord.transaction do
status = account.statuses.create!(text: text,
thread: in_reply_...
sensitive: option...
spoiler_text: opt...
visibility: optio...
language: detect_...
application: opti...
attach_media(status, media)
end
process_mentions_service.call(status)
process_hashtags_service.call(status)
LinkCrawlWorker.perform_async(status.id) unless statu...
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status...
status
end
}}
いろいろなWorkerが呼ばれていますが、DistributionWorkerを...
app/works/distribution_worker.rb
#code(Ruby){{
class DistributionWorker < ApplicationWorker
include Sidekiq::Worker
def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound
info("Couldn't find the status")
end
end
}}
さらにFanOutOnWriteService。
app/services/fan_out_on_write_service.rb
#code(Ruby){{
class FanOutOnWriteService < BaseService
# Push a status into home and mentions feeds
# @param [Status] status
def call(status)
raise Mastodon::RaceConditionError if status.visibili...
deliver_to_self(status) if status.account.local?
if status.direct_visibility?
deliver_to_mentioned_followers(status)
else
deliver_to_followers(status)
end
return if status.account.silenced? || !status.public_...
render_anonymous_payload(status)
deliver_to_hashtags(status)
return if status.reply? && status.in_reply_to_account...
deliver_to_public(status)
end
def deliver_to_self(status)
Rails.logger.debug "Delivering status #{status.id} to...
FeedManager.instance.push(:home, status.account, stat...
end
}}
**app/lib/feed_manager.rb [#u274292c]
FeedManagerのpushメソッド。
#code(Ruby){{
def push(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
if status.reblog?
# If the original status is within 40 statuses from...
rank = redis.zrevrank(timeline_key, status.reblog_o...
return if !rank.nil? && rank < 40
redis.zadd(timeline_key, status.id, status.reblog_o...
else
redis.zadd(timeline_key, status.id, status.id)
trim(timeline_type, account.id)
end
PushUpdateWorker.perform_async(account.id, status.id)
end
}}
なかなかしぶとい(笑)。さらにPushUpdateWorkerです。
#code(Ruby){{
class PushUpdateWorker
include Sidekiq::Worker
def perform(account_id, status_id)
account = Account.find(account_id)
status = Status.find(status_id)
message = InlineRenderer.render(status, account, 'api...
Redis.current.publish("timeline:#{account.id}", Oj.du...
rescue ActiveRecord::RecordNotFound
true
end
end
}}
「timeline:アカウントID」への書き込みがありました。これで...
*おわりに [#te19abd0]
今回はストリーミングの処理を見てきました。ストリーミング...
-クライアント側の接続、サーバから送られてきたら表示処理
-サーバ側その1。クライアントへデータを送る部分(Node)
-サーバ側その2。送るデータの書き込み部分(Rails)
サーバ内での情報の受け渡しにはRedisが使われていました。
ここまでで、初期画面表示、トゥート作成、フォロー、リモー...
終了行:
[[Mastodonを読む]]
#contents
*はじめに [#ybf3408f]
home#indexの処理を見た際など、いくつかのコード中にストリ...
*復習:app/controllers/home_controller.rb [#p8a42340]
home#indexを見た際、ストリーミングに利用するURLが以下のよ...
#code(Ruby){{
def index
@body_classes = 'app-body'
@token = find_or_create_access_token...
@web_settings = Web::Setting.find_by(user: ...
@admin = Account.find_local(Setting....
@streaming_api_base_url = Rails.configuration.x.strea...
end
}}
Rails.configuration.xってなんだろ。アプリ独自の設定書くと...
indexのレンダリングが行われ、クライアントにはinitial-stat...
#code{{
{
"meta":{
"streaming_api_base_url":"http://localhost:4000",
"access_token":"8920c1d8279002fa0f6b74f96bf7c971f9973...
}}
ここで重要なのは、ストリーミングで使われるサーバがMastodo...
*クライアント側:app/assets/javascripts/components/contai...
サーバが別ということが確認できましたが、まずはサーバの前...
#code{{
class Mastodon extends React.Component {
componentDidMount() {
const { locale } = this.props;
const streamingAPIBaseURL = store.getState().getIn(['...
const accessToken = store.getState().getIn(['meta', '...
this.subscription = createStream(streamingAPIBaseURL,...
connected () {
store.dispatch(connectTimeline('home'));
},
disconnected () {
store.dispatch(disconnectTimeline('home'));
},
received (data) {
switch(data.event) {
case 'update':
store.dispatch(updateTimeline('home', JSON.pars...
break;
case 'delete':
store.dispatch(deleteFromTimelines(data.payload...
break;
case 'notification':
store.dispatch(updateNotifications(JSON.parse(d...
break;
}
},
reconnected () {
store.dispatch(connectTimeline('home'));
store.dispatch(refreshTimeline('home'));
store.dispatch(refreshNotifications());
}
});
省略
}
}}
createStreamで接続、引数で渡しているハンドラオブジェクト...
**app/assets/javascripts/components/stream.jsx [#p927c34c]
createStreamを確認しましょう。
#code{{
export default function getStream(streamingAPIBaseURL, ac...
const ws = new WebSocketClient(`${createWebSocketURL(st...
ws.onopen = connected;
ws.onmessage = e => received(JSON.parse(e.data));
ws.onclose = disconnected;
ws.onreconnect = reconnected;
return ws;
};
}}
というわけで、「/api/v1/streaming/」というURLにアクセスが...
*サーバ側:streaming/index.js [#qe46045f]
ここからサーバ側。「/api/v1/streaming/」で探してみると、s...
**クライアントからの接続受け付け [#i22b0c10]
最後の方から見ると以下の記述があります。
#code(javascript){{
wss.on('connection', ws => {
const location = url.parse(ws.upgradeReq.url, true)
const token = location.query.access_token
const req = { requestId: uuid.v4() }
accountFromToken(token, req, err => {
if (err) {
log.error(req.requestId, err)
ws.close()
return
}
switch(location.query.stream) {
case 'user':
streamFrom(`timeline:${req.accountId}`, req, stre...
break;
省略
}
})
})
}}
accountFromTokenは名前の通り、アクセストークンからアカウ...
**streamFrom [#e9f4ef39]
では。streamFromです。
#code(javascript){{
const streamFrom = (id, req, output, attachCloseHandler...
log.verbose(req.requestId, `Starting stream from ${id...
const listener = message => {
const { event, payload, queued_at } = JSON.parse(me...
const transmit = () => {
const now = new Date().getTime()
const delta = now - queued_at;
log.silly(req.requestId, `Transmitting for ${req....
output(event, payload)
}
// Only messages that may require filtering are sta...
// are already personalized and deletes do not matter
if (needsFiltering && event === 'update') {
省略
} else {
transmit()
}
}
subscribe(id, listener)
attachCloseHandler(id, listener)
}
}}
listenerを定義しています。listenerではmessageが送られてき...
**subscribe [#c7aa549c]
というわけで今度はsubscribe。
#code(javascript){{
const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`)
subs[channel] = subs[channel] || []
subs[channel].push(callback)
}
}}
よくあるイベントリスナー的なコードです。ではこのsubsをど...
#code(javascript){{
const redisClient = redis.createClient({
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
})
const subs = {}
redisClient.on('pmessage', (_, channel, message) => {
const callbacks = subs[channel]
log.silly(`New message on channel ${channel}`)
if (!callbacks) {
return
}
callbacks.forEach(callback => callback(message))
})
redisClient.psubscribe('timeline:*')
}}
Redisに接続し、Redisからメッセージが飛んで来たら対応するc...
*Redisへの書き込み:app/services/post_status_service.rb [...
というわけで後はRedisに書き込んでいる場所を探せばいいこと...
#code(Ruby){{
class PostStatusService < BaseService
def call(account, text, in_reply_to = nil, options = {})
media = validate_media!(options[:media_ids])
status = nil
ApplicationRecord.transaction do
status = account.statuses.create!(text: text,
thread: in_reply_...
sensitive: option...
spoiler_text: opt...
visibility: optio...
language: detect_...
application: opti...
attach_media(status, media)
end
process_mentions_service.call(status)
process_hashtags_service.call(status)
LinkCrawlWorker.perform_async(status.id) unless statu...
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status...
status
end
}}
いろいろなWorkerが呼ばれていますが、DistributionWorkerを...
app/works/distribution_worker.rb
#code(Ruby){{
class DistributionWorker < ApplicationWorker
include Sidekiq::Worker
def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound
info("Couldn't find the status")
end
end
}}
さらにFanOutOnWriteService。
app/services/fan_out_on_write_service.rb
#code(Ruby){{
class FanOutOnWriteService < BaseService
# Push a status into home and mentions feeds
# @param [Status] status
def call(status)
raise Mastodon::RaceConditionError if status.visibili...
deliver_to_self(status) if status.account.local?
if status.direct_visibility?
deliver_to_mentioned_followers(status)
else
deliver_to_followers(status)
end
return if status.account.silenced? || !status.public_...
render_anonymous_payload(status)
deliver_to_hashtags(status)
return if status.reply? && status.in_reply_to_account...
deliver_to_public(status)
end
def deliver_to_self(status)
Rails.logger.debug "Delivering status #{status.id} to...
FeedManager.instance.push(:home, status.account, stat...
end
}}
**app/lib/feed_manager.rb [#u274292c]
FeedManagerのpushメソッド。
#code(Ruby){{
def push(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
if status.reblog?
# If the original status is within 40 statuses from...
rank = redis.zrevrank(timeline_key, status.reblog_o...
return if !rank.nil? && rank < 40
redis.zadd(timeline_key, status.id, status.reblog_o...
else
redis.zadd(timeline_key, status.id, status.id)
trim(timeline_type, account.id)
end
PushUpdateWorker.perform_async(account.id, status.id)
end
}}
なかなかしぶとい(笑)。さらにPushUpdateWorkerです。
#code(Ruby){{
class PushUpdateWorker
include Sidekiq::Worker
def perform(account_id, status_id)
account = Account.find(account_id)
status = Status.find(status_id)
message = InlineRenderer.render(status, account, 'api...
Redis.current.publish("timeline:#{account.id}", Oj.du...
rescue ActiveRecord::RecordNotFound
true
end
end
}}
「timeline:アカウントID」への書き込みがありました。これで...
*おわりに [#te19abd0]
今回はストリーミングの処理を見てきました。ストリーミング...
-クライアント側の接続、サーバから送られてきたら表示処理
-サーバ側その1。クライアントへデータを送る部分(Node)
-サーバ側その2。送るデータの書き込み部分(Rails)
サーバ内での情報の受け渡しにはRedisが使われていました。
ここまでで、初期画面表示、トゥート作成、フォロー、リモー...
ページ名: