Search code examples
typescriptrxjsrxjs-pipeable-operatorsrxjs-subscriptions

Adding pipes after subscribing to a push notification service


Situation:
I've encountered a use case for the rxjs Observable system, where I may need to add piped commands to a Subscription after it has been started.

In my case, the application I'm working on has to passively listen to a push notification system. A number of messages can be pushed out over this system, which my system needs to respond to. However, there's a foreseeable case where a dynamically-loaded view that will be implemented in the future will need to add a listener to the push notification system.

Question:
Given that my app is in a state where my Subscription already exists, can I add an additional pipe after .subscribe(() => {}) has been invoked?

// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

...And if I do that, then what happens, if anything?

Solution:
The two answers by @user2216584 and @SerejaBogolubov both had an aspect of the answer to this question.

My high-level push notification listener service needed to to do two things:

  1. Hold onto the subscription, and
  2. Be able to draw from a list of listeners.

The complication is that each listener needs to be listening for a different message. Put differently, if I receive a message on foo_DEV, the app needs to do something different than if the push notification system pushes a message on bar_DEV.

So, here's what I came up with:

export interface PushNotificationListener {
  name: string,
  onMessageReceived: (msg: PushNotificationMessage) => any,
  messageSubject$: Subject<PushNotificationMessage>
}

export class PushNotificationListenerService {
  private connection$: Observable<PushNotificationConnection>;
  private subscription$: Subscription;

  private listeners: PushNotificationListener[] = [];

  constructor(
    private connectionManager: PushNotificationConnectionManager
  ) {
  }

  connect() {
    // Step 1 - Open the socket connection!
    this.connection$ = this.connectionManager.connect(
      // The arguments for setting up the websocket are unimportant here.
      // The underlying implementation is similarly unimportant.
    );
  } 

  setListener(
    name: string,
    onMessageReceived: (msg: PushNotificationMessage) => any
  ) {
    // Step 3...or maybe 2...(shrug)...
    // Set listeners that the subscription to the high-order connection
    // will employ.
    const newListener: PushNotificationListener = {
      name: name,
      onMessageReceived: onMessageReceived,
      messageSubject$: null
    };

    this.listeners.push(newListener);
  }

  listen() {
    // Step 2 - Listen for changes to the high-order connection observable.
    this.subscription$ = this.connection$
      .subscribe((connection: PushNotificationConnection) => {
        console.info('Push notification connection established');

        for (let listener of this.listeners) {
         listener.messageSubject$ = connection.subscribe(listener.name);
         listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
           listener.onMessageReceived(message);
         }
        }
      },
      (error: any) => {
        console.warn('Push notification connection error', error);
      }
  }
}

I discovered through careful study of the internal code that comprises the core of my push notification system, that we've already got a higher-order Observable. The websocket code creates an observable (connectionManager.connect()), that needs to be cached in the service, and subscribed to. As that code is specific to where I work, I can say no more about it.

However, caching the listeners is important too! The subscribe call in .listen() just iterates through all the attached listeners any time that the connection changes state, so I can extemporaneously add listeners through .addListener(), and because of how rxjs' Observable system inherently works, AND the fact that I'm working from an in-scope list of listeners, I have a system whereby I can dynamically set listeners, even if .connect() is called before any listeners are configured.

This code probably can still benefit from redesign/refactoring, but I have something that works, which is the important first step of any good coding. Thank you all!


Solution

  • [I am editing my answer because the previous answer was as per the very first code shared by the author; As mentioned in the comment, the author has changed/corrected the code] -

    I doubt that the following code will impact anything in subscription -

    this.something.pipe(
      map((something) => {
        // ...Commands that I want to add to the subscription...
      })
    );
    

    You could try a higher order function while initially setting up your observable and if higher order function is in the scope you can reassign it. I also doubt that it will work because of the following reasons -

    1. When Observable is set up, observable keeps the reference of the function passed which will be invoked on subscribe [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]. Now if you reassign the higher order function then the observable function still points to the old reference. By reassigning the higher order function, you have not changed your original function reference which was set up when you initially set up the observable.

    2. Assume that for some reason, higher order reassignment works, In that case, also there is a fair chance that before your older higher-order function executes you might have reassigned your higher order function (because if source observable makes async call to backend, while the code is awaited, javascript event loop might have reassigned the higher order function and when async call comes back it will execute the new assigned higher order function). Maybe this code will clarify my point-

    let higherOrderFunc = map(x => x * 2);

    this.something
        .pipe(
              mergeMap(_ => //call to backend; async call),
              higherOrderFunc,
             ).subscribe();
    higherOrderFunc = map(x => x * 3); // this will execute before async call completes