Search code examples
rxjsredux-thunksubject-observer

RxJS Subject of Subjects


I'm trying to create a subject of a list of downloading assets that sends actions for each asset, using a Subject of Subjects if that's possible?

   export function onDownloadGuide(action$,store){
  return action$.ofType(DOWNLOAD_GUIDE)
    .mergeMap(() => downloadGuideAssets().map(res => downloadGuideAssetProgress(res)))
}

function downloadGuideAssets(){
  const subject$ = new Subject()
  getAssetList().map((asset) => downloadAsset(asset).map(res => {console.log(res);subject$.next(res)}))
  return subject$.asObservable()
}

function downloadAsset({id,src}){

  const subject$ = new Subject()

  window.resolveLocalFileSystemURL(cordova.file.dataDirectory, dirEntry => {
    dirEntry.getFile(src.substring(src.lastIndexOf('/')+1),{create:true, exclusive:true}, f => {
      fetch(src).then(fetchProgress({onProgress(progress) {console.log('progressevent');subject$.next({id,progress})}}))
        .then(res => res.blob())
        .then(blob =>
          f.createWriter(writer => {
            writer.onwriteend = ()=> subject$.next({id,complete:true})
            writer.write(blob)
          }))
        .catch(err => subject$.next({id,error:err}))

    }, err => subject$.next({id,error:err}) )
  })

  return subject$.asObservable()
}

The files do appear to be getting downloaded etc. I can console log the progress - just nothing is getting fired when I try and map the results.

Is this because I'm starting it with a subscribe?


Solution

  • It can be further simplified like below and use it like downloadGuideAssets().subscribe(), you don't really need a subject

    function downloadGuideAssets(){
      return getAssetList().map((asset) => downloadAsset(asset))
    }
    
    function downloadAsset({id,src}){
    return Observable.create(obs=>{
      window.resolveLocalFileSystemURL(cordova.file.dataDirectory, dirEntry => {
        dirEntry.getFile(src.substring(src.lastIndexOf('/')+1),{create:true, exclusive:false}, f => {
          fetch(src).then(fetchProgress({onProgress(progress) {subject$.next({id,progress})}}))
            .then(res => res.blob())
            .then(blob =>
              f.createWriter(writer => {
                writer.onwriteend = ()=> obs.next({id,complete:true})
                writer.write(blob)
              }))
            .catch(err => obs.error({id,error:err}))
        }, err => obs.error({id,error:err}) )
      })
    })
    }