轉至主要內容

使用通道

到目前為止,我們已經使用 takeput Effect 與 Redux Store 進行通訊。通道能概括這些 Effect,以便與外部事件源或在 Sagas 之間自身進行通訊。它們也可做為封存特定從 Store 而來的動作的佇列。

在本區段中,我們將會了解

  • 如何使用 yield actionChannel Effect 來暫存特定從 Store 而來的動作。

  • 如何使用 eventChannel 工廠函數將 take Effect 連接至外部事件源。

  • 如何使用通用 channel 工廠函數建立通道,並在 take/put Effect 中使用它們在兩個 Sagas 之間進行通訊。

使用 actionChannel effect

讓我們回顧經典範例

import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}

function* handleRequest(payload) { ... }

上記の例は典型的な「監視と分岐」パターンを表しています。watchRequestsSagaはforkを使用してブロックするのを避け、ストアからのアクションをすべて欠落しないようにします。REQUESTアクションごとにhandleRequestタスクが作成されます。したがって、多数のアクションが素早く発火する場合、多数のhandleRequestタスクを同時に実行できます。

では、要件が次のとおりであるとします。REQUESTを直列に処理したいとします。4つのアクションが一度実行された場合、最初のREQUESTアクションを処理し、そのアクションを完了してから、2番目のアクションを処理していきます...

したがって、処理されていないすべてのアクションを「キュー」に登録し、現在のリクエストの処理が完了したら、キューから次のメッセージを取得したいと思います。

Redux-SagaにはactionChannelというちょっとしたヘルパーエフェクトが用意されており、これを使用して処理できます。このヘルパーを使用して、前の例をどのように書き直せるかを見てみましょう。

import { take, actionChannel, call, ... } from 'redux-saga/effects'

function* watchRequests() {
// 1- Create a channel for request actions
const requestChan = yield actionChannel('REQUEST')
while (true) {
// 2- take from the channel
const {payload} = yield take(requestChan)
// 3- Note that we're using a blocking call
yield call(handleRequest, payload)
}
}

function* handleRequest(payload) { ... }

最初のステップは、アクションチャンネルを作成することです。yield actionChannel(pattern)を使用します。ここで、patternはtake(pattern)で以前に説明したものと同じ規則を使用して解釈されます。2つの形式の違いは、actionChannelは佐賀がまだ受け取る準備ができていない場合(例:APIコールによってブロックされている場合)に受信メッセージをバッファリングできることです。

次はyield take(requestChan)です。Reduxストアから特定のアクションを受け取るためにpatternとともに使用される他に、takeはチャンネルで使用することもできます(上で特定のReduxアクションからチャンネルオブジェクトを作成しました)。takeは、チャンネルでメッセージが利用可能になるまで佐賀をブロックします。また、インターフェイスバッファにメッセージが保存されている場合は、すぐに再開することもできます。

重要なのは、ブロックcallをどのように使用しているかということです。call(handleRequest)が戻るまで、佐賀はブロックされたままになります。しかし、その間、佐賀がブロックされている間に他のREQUESTアクションがディスパッチされると、requestChanによって内部的にキューに登録されます。佐賀がcall(handleRequest)から再開して次のyield take(requestChan)を実行するときに、takeはキューに登録されたメッセージで解決します。

既定では、actionChannelはすべての受信メッセージを制限なくバッファリングします。バッファリングをより細かく制御する必要がある場合は、Bufferの引数をエフェクトクリエーターに提供できます。Redux-Sagaには、一般的なバッファ(none、dropping、sliding)が用意されていますが、独自のバッファリングを実装することもできます。詳細については、APIドキュメントを参照してください。

たとえば、最新の5つのアイテムのみを処理する必要がある場合は、次を使用できます。

import { buffers } from 'redux-saga'
import { actionChannel } from 'redux-saga/effects'

function* watchRequests() {
const requestChan = yield actionChannel('REQUEST', buffers.sliding(5))
...
}

透過 eventChannel 工廠連接外部事件

actionChannel (Effect) 一樣,eventChannel (工廠函數,非 Effect) 會為事件建立一個通道,但不是來自 Redux Store 的事件來源。

這個基本範例會從間隔中建立一個通道

import { eventChannel, END } from 'redux-saga'

function countdown(secs) {
return eventChannel(emitter => {
const iv = setInterval(() => {
secs -= 1
if (secs > 0) {
emitter(secs)
} else {
// this causes the channel to close
emitter(END)
}
}, 1000);
// The subscriber must return an unsubscribe function
return () => {
clearInterval(iv)
}
}
)
}

eventChannel 中的第一個參數是 訂閱者 函數。訂閱者的角色是初始化外部事件來源 (上述範例使用 setInterval) ,然後透過呼叫提供的 emitter,將來源中所有進來的事件路由到通道。在上述範例中,我們會在每一秒呼叫 emitter

注意:您需要淨化事件來源,以免透過事件通道傳遞 null 或未定義。即使透過數字沒問題,我們建議您將您的事件通道資料像您的 redux 動作那樣建構。{ number } 高於 number

注意呼叫 emitter(END)。我們用這個來通知任何通道消費者通道已關閉,意即不會有任何其他訊息傳遞到這個通道中。

我們來看一下如何從我們的 Saga 使用這個通道。(這取自於資源庫中的取消計數器範例。)

import { take, put, call } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'

// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }

export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
// take(END) will cause the saga to terminate by jumping to the finally block
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
console.log('countdown terminated')
}
}

Saga 會回報 take(chan)。這會導致 Saga 封鎖,直到有訊息放入通道為止。在上述範例中,這會對應到我們呼叫 emitter(secs) 的時候。同時注意,我們會在 try/finally 區塊中執行整個 while (true) {...} 迴圈。當間隔終止時,倒數計時函數會透過呼叫 emitter(END) 關閉事件通道。關閉通道會導致封鎖在該通道 take 上的所有 Saga 終止。在我們的範例中,終止 Saga 會導致它跳到其 finally 區塊 (如果提供,否則 Saga 會終止)。

訂閱者會回傳 取消訂閱 函數。這會在事件來源完成前由通道用於取消訂閱。在一個從事件通道消耗訊息的 Saga 內,如果我們想在事件來源完成前 早期退出 (例如:Saga 已取消),您可以呼叫 chan.close() 來關閉通道並取消訂閱來源。

例如,我們可以讓我們的 Saga 支援取消

import { take, put, call, cancelled } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'

// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }

export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
if (yield cancelled()) {
chan.close()
console.log('countdown cancelled')
}
}
}

以下是您如何使用事件通道把 WebSocket 事件傳遞到您的 saga 中的另一個範例 (例如:使用 socket.io 函式庫)。假設您正在等待伺服器訊息 ping 然後在一段延遲後回覆 pong 訊息。

import { take, put, call, apply, delay } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'

// this function creates an event channel from a given socket
// Setup subscription to incoming `ping` events
function createSocketChannel(socket) {
// `eventChannel` takes a subscriber function
// the subscriber function takes an `emit` argument to put messages onto the channel
return eventChannel(emit => {

const pingHandler = (event) => {
// puts event payload into the channel
// this allows a Saga to take this payload from the returned channel
emit(event.payload)
}

const errorHandler = (errorEvent) => {
// create an Error object and put it into the channel
emit(new Error(errorEvent.reason))
}

// setup the subscription
socket.on('ping', pingHandler)
socket.on('error', errorHandler)

// the subscriber must return an unsubscribe function
// this will be invoked when the saga calls `channel.close` method
const unsubscribe = () => {
socket.off('ping', pingHandler)
}

return unsubscribe
})
}

// reply with a `pong` message by invoking `socket.emit('pong')`
function* pong(socket) {
yield delay(5000)
yield apply(socket, socket.emit, ['pong']) // call `emit` as a method with `socket` as context
}

export function* watchOnPings() {
const socket = yield call(createWebSocketConnection)
const socketChannel = yield call(createSocketChannel, socket)

while (true) {
try {
// An error from socketChannel will cause the saga jump to the catch block
const payload = yield take(socketChannel)
yield put({ type: INCOMING_PONG_PAYLOAD, payload })
yield fork(pong, socket)
} catch(err) {
console.error('socket error:', err)
// socketChannel is still open in catch block
// if we want end the socketChannel, we need close it explicitly
// socketChannel.close()
}
}
}

注意:預設情況下,eventChannel 上的訊息不會緩衝。您必須提供緩衝區給 eventChannel 工廠,才能指定通道的緩衝策略 (例如:eventChannel(subscriber, buffer))。 查看 API 文件 以取得更多資訊。

在此 WebSocket 範例中,當發生 Socket 錯誤時 SocketChannel 可能會發出錯誤,會中止我們的 yield take(socketChannel) 在這個事件頻道上等候。請注意,發出錯誤並不會預設中止頻道,我們需要明確關閉頻道,才能在錯誤後結束頻道。

使用頻道讓 Sagas 之間溝通

除了作用頻道和事件頻道,你也可以直接建立預設沒有連結到任何來源的頻道。接著你可以人工將資料 put 到頻道內。如果你想要使用頻道讓 Sagas 之間溝通,這樣會很方便。

舉例來說,讓我們回顧前一個處理請求的範例。

import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}

function* handleRequest(payload) { ... }

我們看過了 watch-and-fork 範式,允許同時處理多個請求,同時執行的工作人員任務數量沒有限制。接著,我們使用了 actionChannel 效果一次限制同時執行的任務為一個。

因此,讓我們假設我們的需求是要限制同時執行的任務數量為三個。當我們收到一個請求,且同時執行的任務少於三個時,我們可以立即處理請求,否則會將任務排隊,並且等候三個 時段 中的一個空出來。

以下是一個使用頻道的處理範例

import { channel } from 'redux-saga'
import { take, fork, ... } from 'redux-saga/effects'

function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)

// create 3 worker 'threads'
for (var i = 0; i < 3; i++) {
yield fork(handleRequest, chan)
}

while (true) {
const {payload} = yield take('REQUEST')
yield put(chan, payload)
}
}

function* handleRequest(chan) {
while (true) {
const payload = yield take(chan)
// process the request
}
}

在上述範例中,我們使用 channel 工廠建立了一個頻道。我們會取回一個預設會緩衝我們放入訊息的所有訊息的頻道 (除非有一個 pending taker,taker 會立即使用此訊息繼續)。

接著 watchRequests saga 分岔了三個工作人員 sagas。請注意,建立的頻道會提供給所有分岔的 sagas。watchRequests 會使用這個頻道將工作 分派 給三個工作人員 sagas。針對每個 REQUEST 作用,Saga 會將 payload 放入頻道。接著,任何 空閒的 工作人員會去取那個 payload。否則,它會由頻道排隊,直到工作人員 Saga 已準備好取用。

所有三個工作人員都會執行典型的 while 迴圈。在每次反覆運算時,工作人員會取下一個請求,或者會封鎖直到有訊息可用。請注意,這個機制在三個工作人員之間提供了自動負載平衡。快速的工作人員不會被慢的工作人員拖慢。

使用 multicastChannel不同的 工作人員溝通

在上一個區塊我們看到,如何使用 channels 負載平衡 相同fork 多次的工作人員之間的請求。萬一我們需要將一個作用放進頻道,並讓 多個不同的 工作人員消耗它,那該怎麼辦?

我們可能需要將傳入請求傳遞給不同的工作人員來執行不同的副作用。

以下是使用 channels 時可以見識到此問題的範例:當我們以在 channel 中放入 put(使用 yield put(chan, payload))時,我們永遠只會執行 一個 工作人員(logWorkermainWorker),而不是 兩個

import { channel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'

function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)

// fork both workers
yield fork(logWorker, chan)
yield fork(mainWorker, chan)

while (true) {
const { payload } = yield take('REQUEST')

// put here will reach only one worker, not both!
yield put(chan, payload)
}
}

function* logWorker(channel) {
while (true) {
const payload = yield take(channel)
// Log the request somewhere..
console.log('logWorker:', payload)
}
}

function* mainWorker(channel) {
while (true) {
const payload = yield take(channel)
// Process the request
console.log('mainWorker', payload)
}
}

要解決此問題,我們需要使用 multicastChannel,它會將動作同時廣播到所有工作人員。

請注意,要將 takemulticastChannel 搭配使用,我們必須傳遞額外的引數 模式,而該模式可用於篩選要 接收 的動作。

請參閱以下範例:

import { multicastChannel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'

function* watchRequests() {
// create a multicastChannel to queue incoming requests
const channel = yield call(multicastChannel)

// fork different workers
yield fork(logWorker, channel)
yield fork(mainWorker, channel)

while (true) {
const { payload } = yield take('REQUEST')
yield put(channel, payload)
}
}

function* logWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Log the request somewhere..
console.log('logWorker:', payload)
}
}

function* mainWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Process the request
console.log('mainWorker', payload)
}
}