Search code examples
javascriptangularrxjssubject

rxjs ReplaySubject handle


i have a problem with a template i am using in angular 4. This template implement a notification system, where you can add new notifications, but the documentation do not specify how one can delete the elements of the observer ReplaySubject.

The template implement this as a service as follows:

private notificationsList: Notification[] = [];
  // a stream that publishes new notifications only once
  public newNotifications: Subject<Notification> = new Subject<Notification>();

  // `notifications` is a stream that emits an array of the most up to date notifications
  public notifications: ReplaySubject<Notification[]> =
      new ReplaySubject<Notification[]>(1);

  // `updates` receives _operations_ to be applied to our `notifications`
  // it's a way we can perform changes on *all* notifications (that are currently
  // stored in `notifications`)
  public updates: Subject<any> = new Subject<any>();

  // action streams
  public create: Subject<Notification> = new Subject<Notification>();
  // public markThreadAsRead: Subject<any> = new Subject<any>();

  constructor() {
    // recois des operation, et les fais sur la liste interne, puis diffuse le
    // resultat sur notifications
    this.updates.subscribe((ope) => {
      this.notificationsList = ope(this.notificationsList);
      console.log(this.notificationsList);
      this.notifications.next(this.notificationsList);
    });

    this.newNotifications
      .map(function(notification: Notification): INotificationsOperation {
        return (notifications: Notification[]) => {
          return notifications.concat(notification);
        };
      })
      .subscribe(this.updates);

  }

  // an imperative function call to this action stream
  public addNotification(notification: Notification): void {
    this.newNotifications.next(notification);
  }

I try to ask to the owner how i can delete an actual element of the notification list, but he just tell me that i can access the "notifications" subject to receive the last version of it. But do not mention how i can actually delete an element of the list.

Some one know something about?

Thanks!


Solution

  • I added a public function that you can use. I added a comment to let you see which part of the code you can modify if you want to delete elements by name for example, or don't want to resize the list. Explanation at the end of my post.

      private notificationsList: Notification[] = [];
      // a stream that publishes new notifications only once
      public newNotifications: Subject<Notification> = new Subject<Notification>();
      public removeNotificationByIndex$ : Subject<number> = new Subject<number>();
      // `notifications` is a stream that emits an array of the most up to date notifications
      public notifications: ReplaySubject<Notification[]> =
          new ReplaySubject<Notification[]>(1);
    
      // `updates` receives _operations_ to be applied to our `notifications`
      // it's a way we can perform changes on *all* notifications (that are currently
      // stored in `notifications`)
      public updates: Subject<any> = new Subject<any>();
    
      // action streams
      public create: Subject<Notification> = new Subject<Notification>();
      // public markThreadAsRead: Subject<any> = new Subject<any>();
    
      constructor() {
        // recois des operation, et les fais sur la liste interne, puis diffuse le
        // resultat sur notifications
        this.updates.subscribe((ope) => {
          this.notificationsList = ope(this.notificationsList);
          console.log(this.notificationsList);
          this.notifications.next(this.notificationsList);
        });
    
        this.newNotifications
          .map(function(notification: Notification): INotificationsOperation {
            return (notifications: Notification[]) => {
              return notifications.concat(notification);
            };
          })
          .subscribe(this.updates);
    
        this.removeNotificationByIndex$
         .map(function(index: number){
            return (notifications: Notification[]) => {
            // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
            notifications.splice(index,1);
            // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
          return notifications
         };
         })
         .subscribe(this.updates);
    
      }
    
      // an imperative function call to this action stream
      public addNotification(notification: Notification): void {
        this.newNotifications.next(notification);
      }
    
      // delete the element in the "index" position of the list. 
      // /!\ Resizes the list 
      public removeNotificationsByIndex(index: number): void {
        this.removeNotificationByIndex$.next(index);
      }
    

    What are the changes ?

    public removeNotificationByIndex$ : Subject<number> = new Subject<number>();
    

    This subject will receive (asynchronously) an index, and trigger a process using this index.

     this.removeNotificationByIndex$
     .map(function(index: number){
        return (notifications: Notification[]) => {
        // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
        notifications.splice(index,1);
        // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
      return notifications
     };
     })
     .subscribe(this.updates);
    

    When the index is emitted (i.e you use the associated imperative function), a function (ES6 arrow function) is generated from it. This is it :

    (notifications: Notification[]) => {
        // >>>> DELETE METHOD IS TO BE DEFINED DOWN HERE !
        notifications.splice(index,1);
        // >>>> DELETE METHOD IS TO BE DEFINED UP HERE !
      return notifications
     };
    

    This function is passed to this.update, which will apply it. In this context, ope is this function. when received, this.notificationList is modified as follow :

    this.notificationsList = ope(this.notificationsList);
    

    Finally, this new list is published to the ReplaySubject notifications:

    this.notifications.next(this.notificationsList);
    

    Which propagate this new list to all its subscribers.

    Voilà :). Good luck !