Discussion:
[AngularJS] chaining observables and subscribing to them
Reza Razavipour
2017-04-12 20:13:28 UTC
Permalink
This is a follow on to an original posting that I could not find anymore :(

At application start time, I need to perform the following async tasks:

- Load a config file
- Retrieve a server address from the config data and request a new model
every 10 seconds.
- Using the above mentioned server, make an HTTP get call and receive a
websocket address.
- Listen on data coming in on the websocket.


I am trying to figure out to chain this action together and run
sequentially.
Each of the above mentioned tasks are observables that I need to subscribe
to.

I am trying to figure out how do the chaining using RxJS and looking at
switchMap.
Great so that chains all of my observables into 1, correct?

If so, where do specify the subscriptions and actions to perform when there
is data to process?

Any ideas or explanations?
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Sander Elias
2017-04-13 05:06:02 UTC
Permalink
Hi Reza,

You get the data handed down to the next observable as a parameter. Also,
there is the .do operator that let you do side-effects.

it would look something like this:

let config;


loadConfig
.do( config => this.config = config) // side effect, do doesn't change
the the observable, it just passes it along
.switchMap( config => $http.get(config.url)
.switchmap( r => {
const addres .= r.toJson().webscocketAddres
return setupWSobservable(addres)
})
.subscribe(msg => handleWsdata(msg))


Typically, you only subscribe once, when you have set up the entire flow.

Regards
Sander
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Reza Razavipour
2017-04-20 20:15:48 UTC
Permalink
so now I have two observables and need to run at the same time, in
parallel, so I merge the last 2 observables.

getConfigurationSettings()

.do(config => {

console.log('we got the configuration settings.');

this.configSettings = config;

})

.switchMap(config => this.askForPatientContextWSAddress())

.do(config => {

this.subscriptionService.setWS(this.contextWS);

this.contextWS.onopen = (evt) => {

this.contextWS.send("Hello World");

};

})

.switchMap(r => this.processContextChanges())

.merge(numbers)

.merge(modelUpdater)

.subscribe(

x => {

console.log('got data ' + x);

},

err => {

console.log(err);

},

() => console.log('app exiting'));


Works as expected, in the subscription block where i receive data, how do I
tell which observable the data is from? yes the types of data coming back
is different, number or string or object. Is that what I should key off of?


What is the correct way to manage this?
Post by Reza Razavipour
This is a follow on to an original posting that I could not find anymore :(
- Load a config file
- Retrieve a server address from the config data and request a new
model every 10 seconds.
- Using the above mentioned server, make an HTTP get call and receive
a websocket address.
- Listen on data coming in on the websocket.
I am trying to figure out to chain this action together and run
sequentially.
Each of the above mentioned tasks are observables that I need to subscribe
to.
I am trying to figure out how do the chaining using RxJS and looking at
switchMap.
Great so that chains all of my observables into 1, correct?
If so, where do specify the subscriptions and actions to perform when
there is data to process?
Any ideas or explanations?
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Reza Razavipour
2017-04-20 20:20:01 UTC
Permalink
And what if in the future I will need to merge with another observable that
returns lets say string, now I will have strings coming back from 2
different observables.
How to tell them apart?
Post by Reza Razavipour
so now I have two observables and need to run at the same time, in
parallel, so I merge the last 2 observables.
getConfigurationSettings()
.do(config => {
console.log('we got the configuration settings.');
this.configSettings = config;
})
.switchMap(config => this.askForPatientContextWSAddress())
.do(config => {
this.subscriptionService.setWS(this.contextWS);
this.contextWS.onopen = (evt) => {
this.contextWS.send("Hello World");
};
})
.switchMap(r => this.processContextChanges())
.merge(numbers)
.merge(modelUpdater)
.subscribe(
x => {
console.log('got data ' + x);
},
err => {
console.log(err);
},
() => console.log('app exiting'));
Works as expected, in the subscription block where i receive data, how do
I tell which observable the data is from? yes the types of data coming back
is different, number or string or object. Is that what I should key off of?
What is the correct way to manage this?
Post by Reza Razavipour
This is a follow on to an original posting that I could not find anymore :(
- Load a config file
- Retrieve a server address from the config data and request a new
model every 10 seconds.
- Using the above mentioned server, make an HTTP get call and receive
a websocket address.
- Listen on data coming in on the websocket.
I am trying to figure out to chain this action together and run
sequentially.
Each of the above mentioned tasks are observables that I need to
subscribe to.
I am trying to figure out how do the chaining using RxJS and looking at
switchMap.
Great so that chains all of my observables into 1, correct?
If so, where do specify the subscriptions and actions to perform when
there is data to process?
Any ideas or explanations?
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Sander Elias
2017-04-21 05:25:28 UTC
Permalink
Hi Reza,

Well, as usual, it depends on what you need to do. But if you have indeed 2
different observables, that result in 2 different things, that need
different handling, then you have a reason to use multiple subscribers.
But be careful with side-effects. Observables work great if you use pure
functions. But if you mix in side-effect, debugging them becomes a
nightmare. If you need side-effect, make sure they are putting things out,
not pulling things in. Does that make sense to you?

Regards
Sander
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Reza Razavipour
2017-04-21 16:49:01 UTC
Permalink
Warning... Very long message...

So I changed one of the observables to a promise because it could work as a
promise as well and all is working well now, as far as I can tell.
Do you see anything wrong with my approach?

// single http get call turning into a promise

getDataP(): Promise<Array<string>> {
return this.http.get(*url*).toPromise();
}


// one observable that reads a configFile

getConfigurationSettings(url) : Observable<string[]> {
return this.http.get(url)
.map((res:Response) => {
return res.json()
})
.catch((error:any) => Observable.throw(error.json().error ||
'Failed to get flowsheet configuration settings'));
}


// another http get call as an observable...

// this also can be made into a promise but it is fine for now

// just one http get call

askForPatientContextWSAddress() : Observable<string[]> {
return this.http.get(this.patientContextProviderURL)
.map((res:Response) => {
return ['ws://localhost:3006'];
})
.catch((error: Response | any) => {
if (error instanceof Response && error.status !== 200) {
console.log('failed to get valid response code:' + error.status
+ 'error text: ' + error.text());
}
return Observable.throw(error);
})
}



//code to call the http get call every 15 sec

startGettingData () {
setInterval(

console.log('startGettingData');

this.getDataP().then(function (stringArray) {
console.log(stringArray);
return stringArray;
}), 15000);
}


// observable to listen for websocket messages

// and process them

processWebscoketMessages(): Observable<any>{
console.log('listenForContextChanges');

return Observable.fromEvent(this.contextWS,'message')
.map(res=>"From WS: " + res.data);
}


// just a test observable code to generate a value every second

var numbers = Observable.interval(1000);


// the final combination of promises and observables...

this.getConfigurationSettings()
.do(config => {

// nothing async
console.log('we got the configuration settings.');
this.configSettings = config;
})


.switchMap(res => this.askForPatientContextWSAddress())


.do(x => {

// nothing async

// construct the webscoket...
this.contextWS = new WebSocket(wsAdress);
this.contextWS.onopen = (evt) => {
this.contextWS.send("Hello World");
};
})
.switchMap(r => this. processWebscoketMessages())


// in the following do block/side effect, I start the timer based call to the

// http get promise.
.do (() => this.startGettingData())


.merge(numbers)


.subscribe(
x => {
console.log(*'got data');*


* // process the data, synchronous.* },
err => {
// Log errors if any
console.log(err);
},
() => console.log('app exiting'));



How does the following structure look like?
Post by Sander Elias
Hi Reza,
Well, as usual, it depends on what you need to do. But if you have indeed
2 different observables, that result in 2 different things, that need
different handling, then you have a reason to use multiple subscribers.
But be careful with side-effects. Observables work great if you use pure
functions. But if you mix in side-effect, debugging them becomes a
nightmare. If you need side-effect, make sure they are putting things out,
not pulling things in. Does that make sense to you?
Regards
Sander
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Reza Razavipour
2017-04-21 17:34:40 UTC
Permalink
Actually the promise is really not working :(
so it is supposed to get called every 15 seconds, it gets called at
erratic times and the promise never completes, the then part is never
called.

Any ideas what the problem might be?


I get a mysterious syntax error :

Uncaught SyntaxError: Unexpected identifier

Have not chased it down but I think the promise is broken
Post by Reza Razavipour
Warning... Very long message...
So I changed one of the observables to a promise because it could work as
a promise as well and all is working well now, as far as I can tell.
Do you see anything wrong with my approach?
// single http get call turning into a promise
getDataP(): Promise<Array<string>> {
return this.http.get(*url*).toPromise();
}
// one observable that reads a configFile
getConfigurationSettings(url) : Observable<string[]> {
return this.http.get(url)
.map((res:Response) => {
return res.json()
})
.catch((error:any) => Observable.throw(error.json().error ||
'Failed to get flowsheet configuration settings'));
}
// another http get call as an observable...
// this also can be made into a promise but it is fine for now
// just one http get call
askForPatientContextWSAddress() : Observable<string[]> {
return this.http.get(this.patientContextProviderURL)
.map((res:Response) => {
return ['ws://localhost:3006'];
})
.catch((error: Response | any) => {
if (error instanceof Response && error.status !== 200) {
console.log('failed to get valid response code:' + error.status
+ 'error text: ' + error.text());
}
return Observable.throw(error);
})
}
//code to call the http get call every 15 sec
startGettingData () {
setInterval(
console.log('startGettingData');
this.getDataP().then(function (stringArray) {
console.log(stringArray);
return stringArray;
}), 15000);
}
// observable to listen for websocket messages
// and process them
processWebscoketMessages(): Observable<any>{
console.log('listenForContextChanges');
return Observable.fromEvent(this.contextWS,'message')
.map(res=>"From WS: " + res.data);
}
// just a test observable code to generate a value every second
var numbers = Observable.interval(1000);
// the final combination of promises and observables...
this.getConfigurationSettings()
.do(config => {
// nothing async
console.log('we got the configuration settings.');
this.configSettings = config;
})
.switchMap(res => this.askForPatientContextWSAddress())
.do(x => {
// nothing async
// construct the webscoket...
this.contextWS = new WebSocket(wsAdress);
this.contextWS.onopen = (evt) => {
this.contextWS.send("Hello World");
};
})
.switchMap(r => this. processWebscoketMessages())
// in the following do block/side effect, I start the timer based call to the
// http get promise.
.do (() => this.startGettingData())
.merge(numbers)
.subscribe(
x => {
console.log(*'got data');*
* // process the data, synchronous.* },
err => {
// Log errors if any
console.log(err);
},
() => console.log('app exiting'));
How does the following structure look like?
Post by Sander Elias
Hi Reza,
Well, as usual, it depends on what you need to do. But if you have indeed
2 different observables, that result in 2 different things, that need
different handling, then you have a reason to use multiple subscribers.
But be careful with side-effects. Observables work great if you use pure
functions. But if you mix in side-effect, debugging them becomes a
nightmare. If you need side-effect, make sure they are putting things out,
not pulling things in. Does that make sense to you?
Regards
Sander
--
You received this message because you are subscribed to the Google Groups "Angular and AngularJS discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to angular+***@googlegroups.com.
To post to this group, send email to ***@googlegroups.com.
Visit this group at https://groups.google.com/group/angular.
For more options, visit https://groups.google.com/d/optout.
Loading...