Search code examples
arraysrxjsobservableconcatenationmergemap

In RxJs, How do I execute a sequence inside a mergemap while returning the array of results in order?


I have a function that can break down a message into multiple message chunks. I need these messages to be posted in order to my post function. However I do not want the Observable to block other posts that are incoming. My solution would be in some combination of of the concat operator inside a mergemap but I cannot seem to get it to work properly

I am not sure I can make a diagram but here is my attempt:

-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]

I need request 1 to execute before 2 before 3 and 4 before 5 and before 6. In English I think I would have an observable of observables and I want that to map into observable streams and then map to a standard array for each observable output stream. I am just not sure how to do this exactly. I've been messing around with the code for a long time trying to conceptualize what I just stated and here is my best attempt:

    interface SendInfo {
        message: discord.Message
        content: string
        options?: discord.MessageOptions
    }
    export const sendMessage$: Subject<SendInfo> = new Subject();

    const regex = /[\s\S]{1,1980}(?:\n|$)/g;
    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | discord.Message[] | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
                    (chunk: string):
                    Observable<discord.Message | discord.Message[] | null> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        );
                    }
                ));

                return superObservable.pipe(
                    mergeMap(e => e),
                    toArray(),
                );
            }
        ),
        tap((e): void => Utils.logger.fatal(e)),
        share(),
    );

My output:

[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]

I feel like I'm close to a solution but I cannot figure out how to exactly merge this into a single array. I also don't know if it is functionally correct or not.


Solution

  • I figured out the operator I was looking for was combineAll() and not toArray() after a concat statement. I have another implementation as well with promises. Now I believe both of these should work but I will post the one I'm more sure of first which is the promises.

    Implementation one using promises:

        export const sentMessages$ = sendMessage$.pipe(
            mergeMap(
                (input: SendInfo):
                Observable<(discord.Message | null)[]> => {
                    const chunks: string[] = input.content.match(regex) || [];
                    const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                        (chunk: string):
                        Observable<(discord.Message | null)[]> => {
                            const bound = input.message.channel.send.bind(
                                undefined,
                                chunk,
                                input.options,
                            );
                            // eslint-disable-next-line max-len
                            return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                                bound,
                            ).pipe(
                                // eslint-disable-next-line comma-dangle
                                map((x): (discord.Message | null)[] => [x].flatMap(
                                    (t): (discord.Message | discord.Message[] | null) => t
                                ))
                            );
                        }
                    );
    
                    const promises = observables
                        .map(
                            (obs: Observable<(discord.Message | null)[]>):
                            Promise<(discord.Message | null)[]> => obs.toPromise()
                        );
    
                    const reduced = promises
                        .reduce(async (promiseChain, currentTask):
                        Promise<(discord.Message | null)[]> => [
                            ...await promiseChain,
                            ...await currentTask,
                        ].flatMap((x): (discord.Message | null) => x));
    
                    return from(reduced);
                }
            ),
            share(),
        );
    

    Implementation two pure RxJs:

        export const sentMessages$ = sendMessage$.pipe(
            mergeMap(
                (input: SendInfo):
                Observable<(discord.Message | null)[]> => {
                    const chunks: string[] = input.content.match(regex) || [];
                    const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                        (chunk: string):
                        Observable<(discord.Message | null)[]> => {
                            const bound = input.message.channel.send.bind(
                                undefined,
                                chunk,
                                input.options,
                            );
                            // eslint-disable-next-line max-len
                            return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                                bound,
                            ).pipe(
                                // eslint-disable-next-line comma-dangle
                                map((x): (discord.Message | null)[] => [x].flatMap(
                                    (t): (discord.Message | discord.Message[] | null) => t
                                ))
                            );
                        }
                    );
    
                    return concat(observables).pipe(
                        combineAll(),
                        map(x => x.flatMap(t => t)),
                    );
                }
            ),
            share(),
        );
    

    I believe the promises ones will work because it's reduced into a explicit chain. I am not sure if I am missing some nuance with the concat operator so I'm not 100% sure it will work. The test-bench I wrote is actually not working properly due to a misunderstanding of how promises execute with the defer operator in RxJs but I am getting the expected order according to my bench. I believe my misunderstanding was the reason I didn't come up with these solutions easily.

    [2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
      { channel: { send: [Function] }, content: 'msg7' },
      { channel: { send: [Function] }, content: 'msg8' },
      { channel: { send: [Function] }, content: 'msg9' }
    ]
    [2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
      { channel: { send: [Function] }, content: 'msg4' },
      { channel: { send: [Function] }, content: 'msg5' },
      { channel: { send: [Function] }, content: 'msg6' }
    ]
    [2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
      { channel: { send: [Function] }, content: 'msg1' },
      { channel: { send: [Function] }, content: 'msg2' },
      { channel: { send: [Function] }, content: 'msg3' }
    ]
          ✓ should execute concurrently. (753ms)