arrow-effects-rx2-extensions / arrow.effects.rx2.extensions.observablek.async / asyncF


@JvmName("asyncF") fun <A> asyncF(arg0: ((Either<Throwable, A>) -> Unit) -> Kind<ForObservableK, Unit>): ObservableK<A>

async variant that can suspend side effects in the provided registration function.

The passed in function is injected with a side-effectful callback for signaling the final result of an asynchronous process.

import arrow.effects.rx2.*
import arrow.effects.rx2.extensions.observablek.async.*
import arrow.core.*

import arrow.effects.*
import arrow.effects.typeclasses.Async

fun main(args: Array<String>) {
  fun <F> Async<F>.makeCompleteAndGetPromiseInAsync() =
    asyncF<String> { cb: (Either<Throwable, String>) -> Unit ->
      Promise.uncancelable<F, String>(this).flatMap { promise ->
        promise.complete("Hello World!").flatMap {
          promise.get().map { str -> cb(Right(str)) }

  val result = ObservableK.async().makeCompleteAndGetPromiseInAsync()

See Also