I have a function that can break down a message into multiple message chunks. I need these messages to be posted in order to my post function. However I do not want the Observable to block other posts that are incoming. My solution would be in some combination of of the concat operator inside a mergemap but I cannot seem to get it to work properly
I am not sure I can make a diagram but here is my attempt:
-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]
I need request 1 to execute before 2 before 3 and 4 before 5 and before 6. In English I think I would have an observable of observables and I want that to map into observable streams and then map to a standard array for each observable output stream. I am just not sure how to do this exactly. I've been messing around with the code for a long time trying to conceptualize what I just stated and here is my best attempt:
interface SendInfo {
message: discord.Message
content: string
options?: discord.MessageOptions
}
export const sendMessage$: Subject<SendInfo> = new Subject();
const regex = /[\s\S]{1,1980}(?:\n|$)/g;
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | discord.Message[] | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
(chunk: string):
Observable<discord.Message | discord.Message[] | null> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
);
}
));
return superObservable.pipe(
mergeMap(e => e),
toArray(),
);
}
),
tap((e): void => Utils.logger.fatal(e)),
share(),
);
My output:
[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]
I feel like I'm close to a solution but I cannot figure out how to exactly merge this into a single array. I also don't know if it is functionally correct or not.
I figured out the operator I was looking for was combineAll() and not toArray() after a concat statement. I have another implementation as well with promises. Now I believe both of these should work but I will post the one I'm more sure of first which is the promises.
Implementation one using promises:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
const promises = observables
.map(
(obs: Observable<(discord.Message | null)[]>):
Promise<(discord.Message | null)[]> => obs.toPromise()
);
const reduced = promises
.reduce(async (promiseChain, currentTask):
Promise<(discord.Message | null)[]> => [
...await promiseChain,
...await currentTask,
].flatMap((x): (discord.Message | null) => x));
return from(reduced);
}
),
share(),
);
Implementation two pure RxJs:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
return concat(observables).pipe(
combineAll(),
map(x => x.flatMap(t => t)),
);
}
),
share(),
);
I believe the promises ones will work because it's reduced into a explicit chain. I am not sure if I am missing some nuance with the concat operator so I'm not 100% sure it will work. The test-bench I wrote is actually not working properly due to a misunderstanding of how promises execute with the defer operator in RxJs but I am getting the expected order according to my bench. I believe my misunderstanding was the reason I didn't come up with these solutions easily.
[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg7' },
{ channel: { send: [Function] }, content: 'msg8' },
{ channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg4' },
{ channel: { send: [Function] }, content: 'msg5' },
{ channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg1' },
{ channel: { send: [Function] }, content: 'msg2' },
{ channel: { send: [Function] }, content: 'msg3' }
]
✓ should execute concurrently. (753ms)