EventBus/PubSub vs (reactive extensions) RX with respect to code clarity in a single threaded application

The following is what I see as benefits of using reactive event streams in a single-threaded synchronous application. 1. More declarative, less side-effects and less mutable state. Event streams are capable of encapsulating logic and state, potentially leaving your code without side-effects and mutable variables. Consider an application that counts button clicks and displays the …

Read more

Split Rx Observable into multiple streams and process individually

Easy as pie, just use filter An example in scala import rx.lang.scala.Observable val o: Observable[String] = Observable.just(“a”, “b”, “c”, “a”, “b”, “b”, “b”, “a”) val hotO: Observable[String] = o.share val aSource: Observable[String] = hotO.filter(x ⇒ x == “a”) val bSource: Observable[String] = hotO.filter(x ⇒ x == “b”) val cSource: Observable[String] = hotO.filter(x ⇒ x == …

Read more

RxJS takeWhile but include the last value

Since RxJS 6.4.0 this is now possible with takeWhile(predicate, true). There’s already an opened PR that adds an optional inclusive parameter to takeWhile: https://github.com/ReactiveX/rxjs/pull/4115 There’re at least two possible workarounds: using concatMap(): of(‘red’, ‘blue’, ‘green’, ‘orange’).pipe( concatMap(color => { if (color === ‘green’) { return of(color, null); } return of(color); }), takeWhile(color => color), ) …

Read more

Spring 5 WebClient using ssl

Looks like Spring 5.1.1 (Spring boot 2.1.0) removed HttpClientOptions from ReactorClientHttpConnector, so you can not configure options while creating instance of ReactorClientHttpConnector One option that works now is: val sslContext = SslContextBuilder .forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE) .build() val httpClient = HttpClient.create().secure { t -> t.sslContext(sslContext) } val webClient = WebClient.builder().clientConnector(ReactorClientHttpConnector(httpClient)).build() Basically while creating the HttpClient, we are …

Read more

How to properly handle onError inside RxJava (Android)?

.doOnError() is an operator, and is not as such a part of the Subscriber. Therefore, having a .doOnError() does not count as an implemented onError(). About the question in one of the comments, of course it is possible to use lambdas. In this case simply replace .doOnError(throwable -> L.e(TAG, “Throwable ” + throwable.getMessage())) .subscribe(s -> …

Read more

IConnectableObservables in Rx

Short answer: IConnectableObservable represents a pending hot observable that can be shared with multiple subscribers. Calling IConnectableObservable.Connect() causes the change to hot (subscribes to the cold source observable) Long answer: A cold observable (like Observable.Range) replays the sequence for each subscriber. It’s analagous to a stopwatch, where every subscriber is given their own stopwatch. The …

Read more

How to create an Observable from OnClick Event Android?

You would do something like this: Observable<View> clickEventObservable = Observable.create(new Observable.OnSubscribe<View>() { @Override public void call(final Subscriber<? super View> subscriber) { viewIWantToMonitorForClickEvents.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { if (subscriber.isUnsubscribed()) return; subscriber.onNext(v); } }); } }); // You can then apply all sorts of operation here Subscription subscription = clickEventObservable.flatMap(/* */); // Unsubscribe …

Read more

Recommended reading/tutorials to understand reactive-banana FRP library [closed]

Unfortunately, I haven’t written any comprehensive documentation or tutorials yet, mainly because the reactive-banana library is still somewhat in flux. This means that, at the moment, you’ll have to figure things out yourself from various sources, backed by a reasonably strong Haskell knowledge. What I can do here is to list the various sources and …

Read more