La incorporación de RxJava Observable se considera dañina-Parte 3 | Stelios Franzkakis

En la primera parte de esta serie, describimos los posibles bloqueos que pueden ocurrir al fusionar flujos de RxJava que se ejecutan en subprocesos paralelos, y en la segunda parte, escribimos pruebas y documentación para una solución personalizada—— safeMergeArray()
, Intentará fusionar las secuencias de forma segura sin que la aplicación se bloquee. En este último artículo, estudiaremos la implementación real de nuestra función y confirmaremos que la prueba pasará.
Como ya estamos Nuestra serie, nuestra función deberá utilizarse como alternativa. Completable.mergeArray()
Combine uno o más Completables en uno.
Completable es un tipo Observable, no emite ningún valor, pero emite eventos de finalización o error. Úselo cuando no nos importe el valor devuelto por la operación, solo nos importa el éxito o el fracaso de la operación.
Esto significa que nuestra función deberá ejecutar Completables en el planificador especificado y devolver un evento completo cuando todas las transmisiones se completen correctamente, o devolver un evento de error si al menos uno de ellos indica un error.Por supuesto, debe evitar bloqueos. Completable.mergeArray()
Esto puede suceder cuando las transmisiones se ejecutan en subprocesos paralelos y emiten errores al mismo tiempo.
Cuando se fusiona Completable, el primer error emitido generalmente termina en sentido descendente. En este caso, no queremos terminarlo en el primer error; queremos recopilar los resultados de cada flujo y luego construir nuestra propia lógica para reaccionar. Esto evitará el procesamiento en sentido descendente cuando varios flujos señalen errores al mismo tiempo, lo que es la causa del bloqueo.
Para recopilar los resultados de cada transmisión, necesitamos convertir Completables en Singles Result
Objeto. Single es un tipo observable, solo puede emitir un valor o señalar un error.
Paso 1: Convierta los completos en sencillos
Result
Es una clase sellada personalizada, utilizada para el tipo de retorno de nuestros Singles, representará el evento emitido por el correspondiente Completable:
sealed class Result
object Complete : Result()
class Error(val throwable: Throwable) : Result()
Se puede usar Convertir Completos a Singles toSingleDefault()
Función:
val singles: List<Single<Result>> = completables.map
it.toSingleDefault<Result>(Result.Complete)
.onErrorReturn throwable ->
Result.Error(throwable)
El código anterior acepta algunos Completables y los convierte en una lista de Singles.Si uno Completable
Completado con éxito, correspondiente Single
Devolverá un Result.Complete
Y si emite un error, lo capturaremos y el correspondiente Single
Devolverá un Result.Error(throwable)
De esta manera, podremos determinar más adelante si Completable tuvo éxito o emitió un error.
Paso 2: Singles con cremallera
Ahora que tenemos la lista de solteros, necesitamos usar Single.zip()
operador:
val result = Single.zip(singles) results: Array<Any> ->
results.firstOrNull it is Result.Error ?: Result.Complete
El código anterior esperará a que se ejecute nuestra transmisión y devuelva un Single
Y el resultado de la cirugía.Si al menos una secuencia emitió un error, la combinación Single
Devolverá un Result.Error
Si todas las transmisiones se completan correctamente, devolverá un Result.Complete
.
El último paso: convertirlo de nuevo en Completable
Como estamos trabajando duro para reemplazar Completable.mergeArray()
Todavía tenemos que devolver un Completable
. Esto significará cambiar la «compresión» Single<Result>
volverse Completable
Como último paso:
return result.flatMapCompletable
if (it is Result.Error)
Completable.error(it.throwable)
else
Completable.complete()
¡y muchos más! Logramos fusionar nuestros Completables originales, y si todos estos se completaban con éxito, señalaríamos un evento completo, o si al menos uno de ellos cometiera un error, señalaríamos un evento de error.
Si juntamos todas las partes, tenemos una implementación personalizada que fusiona Completables, una llamada safeMergeArray()
Esto requiere algunos Completable y fusionarlos en uno.Puede reemplazar directamente Completable.mergeArray()
:
Ahora hemos escrito nuestra implementación safeMergeArray()
Función, estamos listos para ejecutar nuestras pruebas nuevamente y esperamos que pasen.