
我目前在Android上使用RxJava与Kotlin,但我有一个问题,如果不使用toBlocking()我无法解决.
我在员工服务中有一个返回Observable>的方法:
fun all(): Observable<List<Employee>>这一切都很好,因为只要员工发生变化,这个Observable就会发布新的员工列表.但是我想从员工那里生成pdf文件,显然不需要每次员工更改时都运行.另外,我想从pdf生成器方法返回一个Completable对象.我想在我的pdf中添加标题,然后遍历员工并计算每个员工的工资,这也会返回一个Observable,这就是我现在使用toBlocking的地方.我目前的做法是:
private fun generatepdf(outputStream: OutputStream): Completable { return employeeService.all().map { employees -> try { addheaderTopdf() for (i in employees) { val calculated = employeeService.calculateWage(i.ID).toBlocking().first() // Print calculated to pdf.... } addFooterTopdf() return @map Completable.complete() } catch (e: Exception) { return @map Completable.error(e) } }.first().toCompletable()有没有办法让这个代码使用RxJava更清洁?
提前致谢!
解决方法:
免责声明:这个答案是一项正在进行的工作.
基本前提:如果您在流中遇到阻塞,那么您做错了.
注意:没有州必须离开可观察的lambda.
第1步:流式传输整个数据集
输入是员工流.对于每个员工,您需要获得一份工资.让我们把它变成一个流.
/** * @param employeesObservable * Stream of employees we're interested in. * @param wageProvIDer * transformation function which takes an employee and returns a [Single] of their wage. * @return * Observable stream spitting indivIDual [Pair]s of employees and their wages. */fun getEmployeesAnDWagesObservable( employeesObservable: Observable<Employee>, wageProvIDer: Function<Employee, Single<Int>>): Observable<Pair<Employee, Int>>? { val employeesAnDWagesObservable: Observable<Pair<Employee, Int>> // Each Employee from the original stream will be converted // to a Single<Pair<Employee, Int>> via flatMapSingle operator. // Remember, we need a stream and Single is a stream. employeesAnDWagesObservable = employeesObservable.flatMapSingle { employee -> // We need to get a source of wage value for current employee. // That source emits a single Int or errors. val wageForEmployeeSingle: Single<Int> = wageProvIDer.apply(employee) // Once the wage from saID source is loaded... val employeeAnDWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage -> // ... construct a Pair<Employee, Int> employee to wage } // This code is not executed Now. It will be executed for each Employee // after the original Observable<Employee> starts spitting out items. // After subscribing to the resulting observable. return@flatMapSingle employeeAnDWageSingle } return employeesAnDWagesObservable}订阅时会发生什么:
>从源头获取员工.
>获取员工的工资.
>吐出一对员工和他们的工资.
这将重复,直到employeesObservable信号onComplete或某些内容因onError而失败.
二手经营者:
> flatMapSingle:将实际值转换为某个转换值的新单个流.
> map:将实际值转换为其他实际值(无嵌套流).
嘿,你是如何将它连接到你的代码:
fun doStuff() { val employeesObservable = employeeService.all() val wageProvIDer = Function<Employee, Single<Int>> { employee -> // Don't Listen to changes. Take first wage and use that. employeeService.calculateWage(employee.ID).firstOrError() } val employeesAnDWagesObservable = getEmployeesAnDWagesObservable(employeesObservable, wageProvIDer) // Subscribe...}二手经营者:
> first:从observable中取出第一个项目并将其转换为单个流.
> timeout:如果你通过网络获得工资,那么一个好主意就是超时工资.
下一步
选项1:在此结束
不订阅,打电话
val blockingIterable = employeesAnDWagesObservable.blockingIterable()blockingIterable.forEach { ... }并以同步方式处理每个项目.坐下来,找出后续步骤,观看演示文稿,阅读示例.
选项2:添加图层
> .map这些Pair< Employee,Int>中的每一个到一些抽象的pdf构建块.
>通过Observable.fromCallable {…}将页眉和页脚打印机转换为Observables,让它们也返回pdf构建块.
>通过Observable.concat(headerObs,employeeDataObs,footerObs)以顺序方式合并所有这些
>.订阅此结果并开始将pdf构建块写入pdf编写器.
> Todo:
>找出一种在订阅时懒惰地初始化pdf编写器的方法(而不是在构建流之前),
>出错时删除输出,
>完成或出错时关闭输出流.
以上是内存溢出为你收集整理的RxJava Observable to Completable,如何避免toBlocking()全部内容,希望文章能够帮你解决RxJava Observable to Completable,如何避免toBlocking()所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)