I need to connect rn-fetch-blob's onData callback method to an observable.
As far as I know, this is not an event, fromEventPattern couldn't be used. I can't see how to use create if this is the solution to my problem.
I have found bindCallback that looked promising, but the docs says that I probably should use fromEvent instead:
Note that the Observable created by the output function will always emit a single value and then complete immediately. If func calls the callback multiple times, values from subsequent calls will not appear in the stream. If you need to listen for multiple calls, you probably want to use fromEvent or fromEventPattern instead.
I need to listen to multiple calls indeed.
Anyway I'm trying to use bindCallback on an object method as shown in the doc. In my Typescript file:
import { bindCallback } from 'rxjs';
In my class:
private emitter!: Observable<any>;
in a private method:
RNFetchBlob.fs
.readStream(
filePath,
"utf8",
-1,
10
)
.then(ifstream => {
ifstream.open();
this.emitter = bindCallback(ifstream.onData);
but it fails to compile:
error TS2322: Type '() => Observable<string | number[]>' is not assignable to type 'Observable<any>'.
Property '_isScalar' is missing in type '() => Observable<string | number[]>'.
I really can't see how to use fromEvent in my case.
Any help appreciated.
EDIT: Added working code for those looking for the answer:
RNFetchBlob.fs
.readStream(
// file path
peripheral.name,
// encoding, should be one of `base64`, `utf8`, `ascii`
"utf8",
// (optional) buffer size, default to 4096 (4095 for BASE64 encoded data)
// when reading file in BASE64 encoding, buffer size must be multiples of 3.
-1,
10
)
.then(ifstream => {
ifstream.open();
this.emitter = new Observable(subscriber => {
ifstream.onData(chunk => {
// chunk will be a string when encoding is 'utf8'
logging.logWithTimestamp(`Received [${chunk}]`);
subscriber.next(chunk);
});
ifstream.onError(err => {
logging.logWithTimestamp(`oops [${err}]`);
subscriber.error(err);
});
ifstream.onEnd(() => {
subscriber.complete();
});
});
this.rxSubscription = this.emitter
.pipe(
concatMap(value =>
this.handleUpdatedValuesComingFromCSVFile(value)
)
)
.subscribe();
Not too familiar with rn-fetch-blob
but hope you get the idea, also you return an function to run clean up logic.
const onDataObserevable=new Obserevable(obs=>{
ifstream.onData(data=>obs.next(data))
return ()=>{... you can add some clean up logic here like unsubcribe from source onData event}
});
UPDATE
converted the whole chain to observable, hope you get the idea
from(RNFetchBlob.fs.readStream(
peripheral.name,
"utf8",
-1,
10
))
.pipe(mergeMap(ifstream => {
ifstream.open();
return new Observable(subscriber => {
ifstream.onData(chunk => {
// chunk will be a string when encoding is 'utf8'
logging.logWithTimestamp(`Received [${chunk}]`);
subscriber.next(chunk);
});
ifstream.onError(err => {
logging.logWithTimestamp(`oops [${err}]`);
subscriber.error(err);
});
ifstream.onEnd(() => {
subscriber.complete();
});
});),
mergeMap(value =>this.handleUpdatedValuesComingFromCSVFile(value)
)
)