RxJS and redux-observable: delay(time) is not applied correctly after mapTo()

I am trying to dispatch 'SET_MIDDLEWARE_TYPE' action with payload.type = 'observable', wait 5 seconds and then make an API call. Currently, the SET_MIDDLEWARE_TYPE action is not being dispatched. If I remove the delay and the mergeMap, it dispatch that action. Expected: FETCH_USER_WITH_OBSERVABLE_REQUEST --> SET_MIDDLEWARE_TYPE (wait 5 seconds) --> FETCH_USER_WITH_OBSERVABLE_SUCCESS Actual: FETCH_USER_WITH_OBSERVABLE_REQUEST --> (wait 5 seconds) --> FETCH_USER_WITH_OBSERVABLE_SUCCESS How ca

RxJS Map array to observable and back to plain object in array

I have an array of objects from which I need to pass each object separately into async method (process behind is handled with Promise and then converted back to Observable via Observable.fromPromise(...) - this way is needed because the same method is used in case just single object is passed anytime; the process is saving objects into database). For example, this is an array of objects: [ { "name": "John", ... }, { "name": "Anna", ... }, { "name": "Joe",, ...

Importing a single rxjs operator not working with react and webpack

I am using redux-observable for react app and webpack for bundling. When I include a specific operator from rxjs like import 'rxjs/add/operator/mapTo'; it doesn't work and throws error TypeError: action$.ofType(...).mapTo is not a function. But when I include complete rxjs library, it works import 'rxjs'; When importing specific operator, my js bundle does contain mapTo code but the methods are not getting included in Observable prototype. I am using webpack for bundling. Do we have

How to use Rxjs 5.5 `first` operator which has mismatch type in arguments?

I'm using an external package which emits an Observable : getValue(key: string | Array<string>, interpolateParams?: Object): Observable<string | any> { return null; } I also use the first lettable operator : import { first } from "rxjs/operators/first"; this.getValue('f').let(first); But I'm getting an error : Type 'Observable' provides no match for the signature '(value: any, index: number, source: Observable): value is any'. I can see where the problem

Cannot find module 'rxjs/testing' from 'jasmine-marbles.umd.js'

I am currently want to test my effects with ngrx/effects. I followed the markdown but I have an error when I want run my test. Cannot find module 'rxjs/testing' from 'jasmine-marbles.umd.js' Here is my code (For the moment I did not do expectations, I just want my test runs) : import { TestBed } from '@angular/core/testing' import { provideMockActions } from '@ngrx/effects/testing' import { ReplaySubject } from 'rxjs/ReplaySubject' import { hot, cold } from 'jasmine-marbles' import { Observ

Rxjs accumulating values such as with scan but with the possibility to reset the accumulator over time

I have one Observable, obs1, which represents a stream of numbers over time. I need to accumulate the sum of such numbers and emit it progressively (i.e. a long way to say I need to use scan operator). Then there is a second Observable, obs2, that represents some sort of "reset time". In other words, when obs2 emits, I have to reset the accumulator I have set on obs1 and start summing from 0. I think I have been able to reach the behavior I want with the following code, but I am not sure it is

Rxjs conditional switchMap based on a condition

I have a situation like the following: myObservable1.pipe( switchMap((result1: MyObservable1) => { if (condition) { return myObservable2; } else { return of(null); } }) ).subscribe(myObservable1 | myObservable2) => { So if condition is false I only have to perform one request, if the condition is true, I have to chain the first request to the following one, they are essentially two requests to the server api. Is there a better solution without returning that nu

Rxjs How do I change state of Single<T> object in a reactive way?

How can I rewrite this function in a reactive way (I don't really like modifying the object within .doOnNext(: Single<List<Integer>> getListSingle() { return Observable.range(0, new Random().nextInt()) .toList() .doOnSuccess(list -> { if (list.size() < 10) { list.add(123); } }); }

how to use rxjs bindCallback

I'm trying to use bindCallback for mqtt-client connect-event (https://www.npmjs.com/package/mqtt#event-connect) and getting error. What am I doing wrong? const client = mqtt.connect("...") const clientOnObs = bindCallback(client.on) clientOnObs('connect').subscribe(console.log) error TypeError: Cannot read property '_events' of undefined at _addListener (events.js:203:19) at addListener (events.js:259:10) at Observable._subscribe (/Users/robert.rajakone/repos/2018_diprem/example-

RxJS of operator always returns cold observable

I was playing with RxJS to understand Hot and Cold Observables. However I am confused with the output below code gives : const source$ = of(1,2,3,4).pipe(share()); source$.subscribe(i => console.log('first subscription',i)); setTimeout(() => source$.subscribe(j => console.log('second subscription',j)), 2000); Expected output is that first subscription prints 1 2 3,4 and second prints just 2,3,4 However both subscriptions put out 1,2,3,4. When I replace "of" with interval it works

Rxjs. Nested conditional observable which pause "main" observable

I have obserable: const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); }) and I would like to subscribe in such way: observable.subscribe({ next(x) { console.log('got value ' + x); if (x == 2) { const observable2 = new Observable(subscriber => { subscriber.next('a'); subscriber.next('b'); subscriber.next('c'); }) observable2.subscribe(...) } }, error(err)

RxJS: Make sure "tap" is fired even after unsubscribe

How can I make sure the tap operator is called even if a subscription is unsubscribed? Imagine the following: function doUpdate() { return this.http.post('somewhere', {}) .pipe( tap(res => console.log(res)) ) } const sub = doUpdate().subscribe(res => /* do something with res */); setTimeout(() => sub.unsubscribe(), 1000) In this case, I just want to prevent the subscribe action from being executed, yet I want to make sure the console log is fired, eve

Rxjs Combine initial Observable with an Observable for changes (into a single Observable)

I have two observables. One that fetches the initial data, and another one that should react to changes and applay them accordingly: const initial$: Observable<Report[]>; const changes$: Observable<ObjectChangeEvent<Report>>; Some characteristics are: initial$ has to complete first, before changes can be applied changes$ can emit 0..n times the changed array should be the basis for the next change emit. That means only the first change should apply to the initial State. Fo

How to start processing after the user finishes typing with RxJS?

I have an input element, where I want to show an autocomplete solution. I try to control the HTTP request's number with RxJS. I want to do: the HTTP request start only after 1 second of the user stop the typing. I have this code: of(this.inputBox.nativeElement.value) .pipe( take(1), map((e: any) => this.inputBox.nativeElement.value), // wait 1 second to start debounceTime(1000), // if value is the same, ignore distinctUntilChanged(), // start connection swi

RxJs test for multiple values from the stream

Given the following class: import { BehaviorSubject } from 'rxjs'; import { map } from 'rxjs/operators'; export class ObjectStateContainer<T> { private currentStateSubject = new BehaviorSubject<T>(undefined); private $currentState = this.currentStateSubject.asObservable(); public $isDirty = this.$currentState.pipe(map(t => t !== this.t)); constructor(public t: T) { this.update(t); } undoChanges() { this.currentStateSubject.next(this.t);

Rxjs How to get results using a loop inside of switch map

I have the next code, and it was working properly. to execute a request to my method fetchDropdownDataByFederationId, but now I have a requirement to execute the same method x number of times. fetchInProgress(queryString?): Observable<IPerson[]> { let PersonList: IPerson[] = []; return this.getItems<IPerson[]>('', queryString).pipe( take(1), switchMap((wls: IPerson[]) => { PersonList = [...wls]; //const createdbyIds = [...new Set(wls.map(

Can't find `combineLatest` in RxJS 5.0

The following code it's causing me a Observable.combineLatest is not a function using RxJS 5.0: let Observable = require('rxjs/Observable.js').Observable; import 'rxjs/add/operator/combineLatest'; Observable .combineLatest([player, spaceShip], (shotEvents, spaceShip) => ({ x: spaceShip ? spaceShip.x : board.canvas.width / 2, timestamp: shotEvents.timestamp })) All other Observables are able to be resolved, the only function not being resolved is my combineLatest. I tried observabl

Choosing the right RxJS construct in order to ensure one subscription completes before another one

I am facing a RxJS issue. My app is currently designed as follows: I have two different clients: ClientA & ClientB that subscribe to two different Observables: ObservableA & ObservableB. Note that the app also mutates a variable called aVariable. Here is the flow: ClientA subscribes to ObservableA. ClientB subscribes to ObservableB. ObservableB subscription read false from aVariable and completes. ObservableA subscription sets aVariable to true and completes (later than Observable

Rxjs Continue stream after subscribe

I have two streams, that looks as follow: const source1 = Rx.Observable.of([1,2,3,4,5]) .map(() => "I am source 1") .do((x) => console.log(x)); const sub1 = source1.subscribe((v) => console.log("Subscribe 2")); const source2 = Rx.Observable.fromEvent(document, 'click') .do(() => console.log("execute!")) .switchMapTo(source1) .do(() => console.log("After switch map.")) .map((x) => "source2"); source2.subscribe((x) => console.log(x)); Without click on docu

Rxjs Chaining Rx interval and delay

I started learning RxJS recently. I was experimenting with it a bit and came across following issue. When chaining together delay and interval I would assume that each value will be emitted after (interval + delay) but this is not the case here. What's more strange to me is that when these two funcs are chained some values are printed at the same time without any delay. How should I understand it? What am I missing? I'm using RxJS 5.0.3 and I was working in jsbin: https://jsbin.com/xoyuded/edi

Rxjs does map().take() reexecute when based on ReplaySubject.asObservable()

I have a ReplaySubject with a buffer size of 1. I am wondering if, when mapping an observable created from calling let observable = subject.asObservable() whether or not the following composition re-executes for each new buffer. observable.map((nextThing) => { console.log(nextThing, 'got one!'); }).take(1) given a stream of Observable<string>'s like "Hey", "Jey" "Fley" would the only output be "Heygot one!"?

Having trouble calling RxJS share using the recommended operator import

I'm trying to understand the best practice for importing rxjs operators It seems like I should import share this way, but, the following doesn't work because it says share expects 0 arguments. I'm not quite sure how to call share correctly. import { share } from 'rxjs/operators'; ... public currentUser: Observable<User> = share(this.currentUser$.asObservable()); Doing it the old way causes no problems. However I seemed to have read that's not the preferred way to import https://www

deferred Rxjs BehaviorSubject mechanism

I have the following requirement. I have An Angular service with an BehaviorSubject. A http request is done and when this is done the BehaviorSubject.next method is invoked with the value. This value can change during the lifecycle of the single page. Different subscribers are registered to it and get invoked whenever this changes. The problem is that while the http request is pending the BehaviorSubject already contains a default value and subscribers are already immediately getting this val

RxJS observable which emits both previous and current value starting from first emission

I have a BehaviorSubject which emits JavaScript objects periodically. I want to construct another observable which will emit both previous and current values of the underlying observable in order to compare two objects and determine the delta. The pairwise() or bufferCount(2, 1) operators are looking like a good fit, but they start emitting only after buffer is filled, but I require this observable to start emitting from the first event of the underlying observable. subject.someBufferingOperat

Rxjs How to join streams based on a key

This is for redux-observable but I think the general pattern is pretty generic to rxjs I have a stream of events (from redux-observable, these are redux actions) and I'm specifically looking to pair up two differnt types of events for the same "resource" - "resource set active" and "resource loaded" - and emit a new event when these events "match up". The problem is these can come in in any order, for any resources, and can be fired multiple times. You might set something active before it is lo

Rxjs Loop and execute code sequentially

I had to create the code to insert rows in batches of 1K rows, however as the Bookshelf / Knex library runs the code using Promises, I'm having some errors and I need that this loop runs sequentially (only after finishing the first SQL, the second should start). How can I do this with Promises (or Rx)? for (var x = 0; x < parser.getCount(); x++) { let row = parser.getRow(x); if (_.isEmpty(row) || _.isEmpty(row.ch) || _.isEmpty(row.location)) { break; } let locationD

Rxjs observable.subscribe(..).unsubscribe(); anti-pattern.

I've been finding myself writing code that looks like this: class WidgetService { getWidgets() { return this.authService.user.pipe( //authService.user is an Observable, that emits the currently authenticated users. first(user => user!=null), //Make sure a null object isn't coming through switchMap(user => { return this.collection.getWhere("widget.ownerId", "==", user.id); //Get all the widgets for that user }) ); } } class

In RxJS, what is the difference between interval and timeInterval operator?

I tried to google it, but I cannot find any clear answer for that. From the documentation, I noticed that one is operator, and one is function. What is the difference between them? and what should I use in my code? Thanks! Here is the documentation link: https://rxjs-dev.firebaseapp.com/api/operators/timeInterval https://rxjs-dev.firebaseapp.com/api/index/function/interval

RxJS / Redux-observables: How would I test EventEmitter?

I am trying to test that redux actions are fired when the finish event on my upload object is fired. Here's my epic: const uploadFileEpic = (action$, state$, dependencies$) => action$.pipe( ofType(uploadActions.UPLOAD_FILE), mergeMap(({ payload }) => { const { file, masterHandle } = payload; return new Observable(o => { const upload = masterHandle.uploadFile("/", file); const handle = upload.handle; upload.on("finish", () => {

Rxjs Cancellation on unsubscribe only if observable hasn't completed

I have an observable for which I would want to call cancellation (teardown) logic when subscriber unsubscribes from it but only if the source observable haven't completed yet (or failed) by itself. The built-in finalize operator lets to register custom callback when unsubscribe occurs but it being called whenever the unsubscription was caused by subscriber or completion of the source observable. I implemented the this helper function: function withCancellation(source, onCancel) { return new

RxJS buffer and delay until one other observable emit first time

I'm getting a hard time making a complex observable pipe and I would be grateful if someone could help me on that one… Context I have one stream of data that gives me through Bluetooth some values which are data frames that I have to decode. This is a BehaviorSubject called RX$. Now on RX$, sometimes I receive Instant Data (INST) and sometimes History Data (HIST). With INST, I receive besides other things the device that is sending data version and model. I successfully generated an observab

rxjs : zip(...promiseArray.map(from)) -> throws "undefined is not a function (near 'scheduler.schedule...)"

Weird behavior with rxjs. getPromise is an async function returning a Promise. When I run this script it will throw an exception TypeError : undefined is not a function (near 'scheduler.schedule...') zip(...[getPromise(), getPromise()].map(from)).subscribe(() => { console.log('hey'); }); But this will work (notice the difference inside map): zip(...[getPromise(), getPromise()].map(p => from(p))).subscribe(() => { console.log('hey'); }); Why the second version works but not

RxJs operator that behaves like withLatestFrom but waits for value of second stream

Hi I'm looking for an RxJs operator that behaves similar to withLatestFrom, with the exception that it would wait for the second stream to emit a value instead of skipping it. To be claer: I only want emissions when the first stream emits a new value. So instead of: ---A-----B----C-----D-| ------1----------2----| withLatestFrom ---------B1---C1----D2| I want this behavior: ---A-----B----C-----D-| ------1----------2----| ????????????? ------A1-B1---C1----D2| Is there an operator for

Is there a performance penalty to using a pipeline operator ponyfill instead of .pipe in RxJS

I use a pipeline operator ponyfill, which is just a utility function applyPipe such that applyPipe(x, a, b) is equivalent to b(a(x)) or x |> a |> b (in this example there are 2 functions, but actually it can be any number of functions). In fp-ts this function is called pipe. In my case the function is implemented as export const applyPipe = ( source, ...project ) => { for (const el of project) { source = el(source); } return source; }; (you could also implement it with .r

Rxjs Difference in signature of disposable returned in Rx.Observable.create

I am trying to figure out the 'correct' way of implementing the dispose handler on my custom-created Observable using Rx.Observable.create named dispose function returned: Rx.Observable.create(obs => { //emit values obs.onComplete(); return function dispose() { console.log('diposed'); }; }); anonymous function returned: Rx.Observable.create(obs => { //emit values obs.onComplete(); return () => { console.log('disposed'); }; }); explicit Disposable.create

RxJS Google Maps Markers

I'm trying to retrieve only new "Google Map" Markers from DataBase by utilizing RxJS observables and jQuery $.ajax I'm receiving an array of markers every 5 seconds. When the data inside the array has been changed, i.e new markers appended, I need to filter out only new markers. var oldArray = [["ObjA",3455,643523,2],["ObjB",5346,2134,1],["ObjC",5341,7135,0]]; var newArray = [["ObjA",3455,643523,2],["ObjD",2384,4791,3],["ObjB",5346,2134,1],["ObjC",5341,7135,0],["ObjF",2631,7228,4]]; So here

Rxjs Angular 5 Firestore join 3 observable database

How can to join 3 firestore collection together, the answer as below ngOnInit() { // this.productService.getImageUrl().subscribe(products=> this.products = products) this.products = this.afs.collection('/products', ref => ref.where('publish', '==', true)) .snapshotChanges().map(actions => { return actions.map(a => { const data = a.payload.doc.data(); const id = a.payload.doc.id; //this.afs.collection('productPhotos').doc(id) return this.afs.collection('users').doc(da

Is there any way to find out which stream triggers merge operator in rxjs?

Let's say there are two streams a$ and b$. I merge a$ and b$ to get the value when either of them emits. So the code would be like, merge(a$, b$).subscribe((val) => // do something); What I'm wondering is if there's any way to find out which stream triggers the operator other than setting some sort of a flag for each stream like below: merge( a$.pipe( tap(() => { fromA = true; fromB = false; }) ), b$.pipe( tap(() => { fromB = true; fromA =

Rxjs How to create own Scheduler?

If you worked with Schedulers you probably know that you can use different already predefined schedulers like queue, async or asap: of('', queueScheduler) of('', asyncScheduler) of('', asapScheduler) that's all more or less clear. But what if you want to create your own scheduler, for example: to make a 5s delay? I could not find any examples/documations about it, except this outdate SO answer - https://stackoverflow.com/a/30921043/274500

rxjs - Tap vs subscribe

In many articles I find that tap operator is a good way to perform side effects. My question is about a difference between performing side effects using subscribe and tap. Here are examples which are doing actually the same: this.store$ .pipe(tap(x => { this.store = x; })); this.store$ .subscribe(x => { this.store = x; }); Is there any difference in the performance or is there any reason to use one

How do I refactor a traditional synchronous loop with RxJS?

I'm new to RxJS and trying to wrap my brain around how I should be writing my code. I'm trying to write a function that extends an existing http which returns an observable array of data. I'd like to then loop over the array and make an http request on each object and return the new array with the modified data. Here's what I have so far: private mapEligibilitiesToBulk(bulkWarranties: Observable<any[]>): Observable<IDevice[]> { const warranties: IDevice[] = []; bulkWarranti

How to create an RxJS Observable such that it returns a value when call back function completes

I need to create an RxJS Observable such that it returns a value when call back function completes. Below is the code, I have tried. I want to return 'resources' to be returned in the caller subscribing to loadMarkerImages function loadMarkerImages(markerNameAndImageUrlMap) { let loader = new PIXI.loaders.Loader(); for (let markerKey in markerNameAndImageUrlMap) { let imageUrl = markerNameAndImageUrlMap[markerKey]; loader.add(markerKey, imageUrl); } Observable.cre

RXJS Emit array item on event trigger

Using RxJS Id like to emit each array item on an event/click. I have below which works fine, but is there a cleaner way? const testEventClick = new Subject(); const array = [1, 2, 3, 4, 5]; from(array) .pipe( concatMap(val => { return new Observable(sub => { testEventClick.subscribe(x => { sub.next(val); sub.complete(); }); }); }) ) .subscribe(console.log); testEventClick.next(); testEventClick.next();

Rxjs Possible memory leak in NativeScript app if user reopens his app multiple times

I'm not sure where is the bug, maybe I'm using rxjs in a wrong way. ngDestroy is not working to unsubscribe observables in NativeScript if you want to close and back to your app. I tried to work with takeUntil, but with the same results. If the user close/open the app many times, it can cause a memory leak (if I understand the mobile environment correctly). Any ideas? This code below it's only a demo. I need to use users$ in many places in my app. Tested with Android sdk emulator and on real de

RxJS: Even if used `shareReplay()` on source observable, `throwError()` gets executed separately for each observable

I'm using RxJS shareReplay() operator on an observable (courses$) to share observable stream among other two observables (beginnerCourses$ and advancedCourses$). It's working fine and single API call response is shared between both observables on success. But, when it comes to error, these observable don't share error and error is seen to be thrown twice in the browser console. Doesn't shareReplay() operator shares error also? Is it an intended behavior? const http$ = createHttpObservable('/ap

Process unknown number of observables in RxJS pipe

I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS: const calls = new Subject(); calls.pipe( mergeAll(10) ).subscribe(); while(int < unknown_number){ calls.next(new Observable((observer) => { // Call the server here })) } The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 second

Rxjs How the mergeMap operator works?

I tried to understand, how mergeMap worked in this example and still don't understand. Can you help me? import { interval } from 'rxjs'; import { mergeMap, take } from 'rxjs/operators'; // emit value every 1s const source$ = interval(1000); source$ .pipe( mergeMap( // project val => interval(5000).pipe(take(2)), // resultSelector (oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal], // concurrent 2 ) ) /* Output:

  1    2   3   4   5   6  ... 下一页 最后一页 共 9 页