Search code examples
websocketreduxredux-saga

redux-saga, websockets and actions queue


I have the following problem: server send's messages to the client through websocket. on the client, I need to display this messages to the user. but the problem is that sometimes messages come to fast, and I need to organize some sort of queue and show that messages one after another.

my saga:

import { eventChannel, effects, takeEvery } from 'redux-saga';
import { types, actionCreators } from './actions';

const { call, put, take, race } = effects;

function watchMessages(socket) {
  return eventChannel((emitter) => {
    socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
    socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
    socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
    socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));

    return () => {
      socket.close();
    };
  });
}

function* internalListener(socket) {
  while (true) {
    const data = yield take(types.SOCKET_SEND);
    socket.send(data.payload);
  }
}

function* externalListener(socketChannel) {
  while (true) {
    const action = yield take(socketChannel);
    yield put(action);
  }
}

function* wsHandling(action) {
  const socket = action.payload.socket;
  while (true) {
    const socketChannel = yield call(watchMessages, socket);
    const { cancel } = yield race({
      task: [call(externalListener, socketChannel), call(internalListener, socket)],
      cancel: take(types.SOCKET_CLOSE),
    });
    if (cancel) {
      socketChannel.close();
    }
  }
}

export default function* rootSaga(action) {
  yield takeEvery(types.SOCKET_CONNECT, wsHandling);
}

my reducer:

function dataReducer(state = initialStateData, action) {
  switch (action.type) {
    case types.SOCKET_MESSAGE:
      if (action.payload.channel === 'channel1') {
        return state
          .set('apichannel1', action.payload);
      } else if (action.payload.channel === 'channel2') {
        return state
          .set('apichannel2', action.payload);
      } else if (action.payload.channel === 'channel3') {
        return state
          .set('apichannel3', action.payload);
      }
      return state;
    default:
      return state;
  }
}

so now, when the new message arrives, I'm changing state and just display it on the screen.

any ideas how I can turn this into the following: put arrived messages into some sort of queue, and show them one by one on screen for some custom time?


Solution

  • so I did it this way, here is the code, maybe it will be useful for someone

    let pendingTasks = [];
    let activeTasks = [];
    
    function watchMessages(socket) {
      return eventChannel((emitter) => {
        socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
        socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
        socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
        socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));
    
        return () => {
          socket.close();
        };
      });
    }
    
    function* internalListener(socket) {
      while (true) {
        const data = yield take(types.SOCKET_SEND);
        socket.send(data.payload);
      }
    }
    
    function* externalListener(socketChannel) {
      while (true) {
        const action = yield take(socketChannel);
        pendingTasks = [...pendingTasks, action];
      }
    }
    
    function* wsHandling(action) {
      const socket = action.payload.socket;
      while (true) {
        const socketChannel = yield call(watchMessages, socket);
        const { cancel } = yield race({
          task: [call(externalListener, socketChannel), call(internalListener, socket)],
          cancel: take(types.SOCKET_CLOSE),
        });
        if (cancel) {
          socketChannel.close();
        }
      }
    }
    
    function* tasksScheduler() {
      while (true) {
        const canDisplayTask = activeTasks.length < 1 && pendingTasks.length > 0;
        if (canDisplayTask) {
          const [firstTask, ...remainingTasks] = pendingTasks;
          pendingTasks = remainingTasks;
          yield fork(displayTask, firstTask);
          yield call(delay, 300);
        }
        else {
          yield call(delay, 50);
        }
      }
    }
    
    function* displayTask(task) {
      activeTasks = [...activeTasks, task];
      yield put(task);
      yield call(delay, 3000);
      activeTasks = _.without(activeTasks, task);
    }
    
    export default function* rootSaga(action) {
      yield [
        takeEvery(types.SOCKET_CONNECT, wsHandling),
        takeEvery(types.SOCKET_CONNECT, tasksScheduler),
      ];
    }