RxJava Observable to Completable,如何避免toBlocking()

RxJava Observable to Completable,如何避免toBlocking(),第1张

概述我目前在Android上使用RxJava与Kotlin,但我有一个问题,如果不使用toBlocking()我无法解决.我在员工服务中有一个返回Observable>的方法:funall():Observable<List<Employee>>这一切都很好,因为只要员工发生变化,这个Observable就会发布新的员工列表.但是我想从员工那里生成PD

我目前在Android上使用RxJava与Kotlin,但我有一个问题,如果不使用toBlocking()我无法解决.

我在员工服务中有一个返回Observable>的方法:

fun all(): Observable<List<Employee>>

这一切都很好,因为只要员工发生变化,这个Observable就会发布新的员工列表.但是我想从员工那里生成pdf文件,显然不需要每次员工更改时都运行.另外,我想从pdf生成器方法返回一个Completable对象.我想在我的pdf中添加标题,然后遍历员工并计算每个员工的工资,这也会返回一个Observable,这就是我现在使用toBlocking的地方.我目前的做法是:

private fun generatepdf(outputStream: OutputStream): Completable {    return employeeService.all().map { employees ->        try {                addheaderTopdf()                for (i in employees) {                    val calculated = employeeService.calculateWage(i.ID).toBlocking().first()                    // Print calculated to pdf....                }                addFooterTopdf()                return @map Completable.complete()            }            catch (e: Exception) {                return @map Completable.error(e)            }        }.first().toCompletable()

有没有办法让这个代码使用RxJava更清洁?

提前致谢!

解决方法:

免责声明:这个答案是一项正在进行的工作.

基本前提:如果您在流中遇到阻塞,那么您做错了.

注意:没有州必须离开可观察的lambda.

第1步:流式传输整个数据集

输入是员工流.对于每个员工,您需要获得一份工资.让我们把它变成一个流.

/** * @param employeesObservable * Stream of employees we're interested in. * @param wageProvIDer * transformation function which takes an employee and returns a [Single] of their wage. * @return * Observable stream spitting indivIDual [Pair]s of employees and their wages. */fun getEmployeesAnDWagesObservable(        employeesObservable: Observable<Employee>,        wageProvIDer: Function<Employee, Single<Int>>): Observable<Pair<Employee, Int>>? {    val employeesAnDWagesObservable: Observable<Pair<Employee, Int>>    // Each Employee from the original stream will be converted    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.    // Remember, we need a stream and Single is a stream.    employeesAnDWagesObservable = employeesObservable.flatMapSingle { employee ->        // We need to get a source of wage value for current employee.        // That source emits a single Int or errors.        val wageForEmployeeSingle: Single<Int> = wageProvIDer.apply(employee)        // Once the wage from saID source is loaded...        val employeeAnDWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->            // ... construct a Pair<Employee, Int>            employee to wage        }        // This code is not executed Now. It will be executed for each Employee        // after the original Observable<Employee> starts spitting out items.        // After subscribing to the resulting observable.        return@flatMapSingle employeeAnDWageSingle    }    return employeesAnDWagesObservable}

订阅时会发生什么:

>从源头获取员工.
>获取员工的工资.
>吐出一对员工和他们的工资.

这将重复,直到employeesObservable信号onComplete或某些内容因onError而失败.

二手经营者:

> flatMapSingle:将实际值转换为某个转换值的新单个流.
> map:将实际值转换为其他实际值(无嵌套流).

嘿,你是如何将它连接到你的代码:

fun doStuff() {    val employeesObservable = employeeService.all()    val wageProvIDer = Function<Employee, Single<Int>> { employee ->        // Don't Listen to changes. Take first wage and use that.        employeeService.calculateWage(employee.ID).firstOrError()    }    val employeesAnDWagesObservable =             getEmployeesAnDWagesObservable(employeesObservable, wageProvIDer)    // Subscribe...}

二手经营者:

> first:从observable中取出第一个项目并将其转换为单个流.
> timeout:如果你通过网络获得工资,那么一个好主意就是超时工资.

下一步

选项1:在此结束

不订阅,打电话

val blockingIterable = employeesAnDWagesObservable.blockingIterable()blockingIterable.forEach { ... }

并以同步方式处理每个项目.坐下来,找出后续步骤,观看演示文稿,阅读示例.

选项2:添加图层

> .map这些Pair< Employee,Int>中的每一个到一些抽象的pdf构建块.
>通过Observable.fromCallable {…}将页眉和页脚打印机转换为Observables,让它们也返回pdf构建块.
>通过Observable.concat(headerObs,employeeDataObs,footerObs)以顺序方式合并所有这些
>.订阅此结果并开始将pdf构建块写入pdf编写器.
> Todo:

>找出一种在订阅时懒惰地初始化pdf编写器的方法(而不是在构建流之前),
>出错时删除输出,
>完成或出错时关闭输出流.

总结

以上是内存溢出为你收集整理的RxJava Observable to Completable,如何避免toBlocking()全部内容,希望文章能够帮你解决RxJava Observable to Completable,如何避免toBlocking()所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/web/1118415.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-29
下一篇2022-05-29

发表评论

登录后才能评论

评论列表(0条)

    保存