I am newbie trying out rxjs and nestjs. The use case that I am currently trying to accomplish is for educational purpose. So I wanted to read a json file (throw an observable error in case of the file being empty or cannot be read) using the "fs" module. Now I create an observable by reading the file asynchronously, set the observer in the subject and then subscribe to the subject in the controller. Here is my code in the service
@Injectable()
export class NewProviderService {
private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
// this is the variable that should be exposed. make the subject as private
// this allows the service to be the sole propertier to modify the stream and
// not the controller or components
serviceSubject$: Observable<HttpResponseModel[]>;
private serviceErrorSubject: BehaviorSubject<any>;
serviceErrorSubject$: Observable<any>;
filePath: string;
httpResponseObjectArray: HttpResponseModel[];
constructor() {
this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
this.serviceSubject$ = this.serviceSubject.asObservable();
this.serviceErrorSubject = new BehaviorSubject<any>(null);
this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();
this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
}
readFileFromJson() {
return new Promise((resolve, reject) => {
fs.exists(this.filePath.toString(), exists => {
if (exists) {
fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
if (err) {
logger.info('error in reading file', err);
return reject('Error in reading the file' + err.message);
}
logger.info('file read without parsing fg', data.length);
if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
// this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
// logger.info('array obj is:', this.httpResponseObjectArray);
logger.info('file read after parsing new', JSON.parse(data));
return resolve(JSON.parse(data).HttpTestResponse);
} else {
return reject(new FileExceptionHandler('no data in file'));
}
});
} else {
return reject(new FileExceptionHandler('file cannot be read at the moment'));
}
});
});
}
getData() {
from(this.readFileFromJson()).pipe(map(data => {
logger.info('data in obs', data);
this.httpResponseObjectArray = data as HttpResponseModel[];
return this.httpResponseObjectArray;
}), catchError(error => {
return Observable.throw(error);
}))
.subscribe(actualData => {
this.serviceSubject.next(actualData);
}, err => {
logger.info('err in sub', typeof err, err);
this.serviceErrorSubject.next(err);
});
}
Now this is the controller class
@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {
await this.newService.getData();
this.newService.serviceSubject$.subscribe(data => {
logger.info('data subscribed', data, _.isEmpty(data));
if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
logger.info('coming in');
res.status(HttpStatus.OK).send(data);
res.end();
}
});
}
The problem I face is that I can get the file details for the first time and the subscription is getting called once > its working fine. On the subsequent requests
Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
at ServerResponse.setHeader (_http_outgoing.js:470:11)
at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
at Ser
and the endpoint /getJsonData results in an error. Could someone help me out. i believe the subscription is not getting properly after the first call, but not sure how to end that and how to resolve that
The problem is that you're subscribing to your serviceSubject
in your controller. Every time a new value is emitted, it will try to send the response. This works the first time, but the second time it will tell you it can't send the same response again; the request has already been handled.
You can use the pipeable first()
operator to complete the Observable after the first value:
@Get('/getJsonData')
public async getJsonData() {
await this.newService.getData();
return this.newService.serviceSubject$.pipe(first())
}
You want your Observable
to be shared (hot), so that every subscriber always gets the same, latest value. That's exactly what a BehaviourSubject
does. So you should not convert your Subject
to an Observable
when you expose it publicly because you will lose this desired behavior. Instead, you can just cast your Subject
to Observable
, so that internally it is still a subject but it will not expose the next()
method to emit new values publicly:
private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
return this.serviceSubject;
}