首页 > 解决方案 > 如何使用执行前一个作业的结果来启动两个 Coroutines 作业?

问题描述

我必须使用以前工作的结果来启动两个 Coroutines 工作。我尝试使用 job.await() 方法进行异步,但它不起作用。这是我的代码

 CoroutineScope(IO).launch {  
        val job  = async {
            val cartList = mutableListOf<Basket>()
            databaseReference.child("Cart").child("Anonymous")
                .addListenerForSingleValueEvent(object : ValueEventListener {
                    override fun onCancelled(p0: DatabaseError) {

                    }

                    override fun onDataChange(dataSnapshot: DataSnapshot) {
                        if (dataSnapshot.exists()) {

                            for (data in dataSnapshot.children) {
                                val cart = data.getValue(Basket::class.java)
                                cartList.add(cart!!)  //I'm getting this thing
                            }
                        }
                    }
                })
            cartList  // But it is returning zero, Why??
        }.await()

        val job2 = async { 
            for(result in job){} 
        }

      val job3 = async { 
            for(result in job){} 
        }
} 

但是每次我调用此方法时,它都会给出 0。为什么会这样,当我能够从 Firebase 数据库中获取数据时。谁能告诉我,这里有什么问题..

编辑:

   val job = withContext(Dispatchers.IO) {
   val data = callbackFlow<List<Basket>>      
  {           
    databaseReference.child("Cart").child("Anonymous")                                  
    .addListenerForSingleValueEvent(object : 
    ValueEventListener {   

 override fun onCancelled(p0: DatabaseError) {

                            }

                            override fun onDataChange(dataSnapshot: DataSnapshot) {
                                if (dataSnapshot.exists()) {
                                    val cartList = mutableListOf<Basket>()
                                    for (data in dataSnapshot.children) {
                                        val cart = data.getValue(Basket::class.java)
                                        cartList.add(cart!!)  //I'm getting this thing
                                    }
                                    offer(cartList)
                                    channel.close()
                                }
                            }
                        })
                    awaitClose()
                }.single()
                data
            }

  val job2 = withContext(Dispatchers.IO) {
            val data = callbackFlow<List<Items>> {
                val itemsList = mutableListOf<Items>()
                for (items in job) {
                    databaseReference.child("Items").child(items.itemsId!!)
                        .addListenerForSingleValueEvent(object : ValueEventListener {
                            override fun onCancelled(p0: DatabaseError) {

                            }

                            override fun onDataChange(dataSnapshot: DataSnapshot) {
                                if (dataSnapshot.exists()) {
                                    val item = dataSnapshot.getValue(Items::class.java)
                                    itemsList.add(item!!)
                                    offer(itemsList) // exception here because channel is closed.
                                }
                            }
                        })
                }
                channel.close()  // here channel is closed 
                awaitClose()
            }.single()
            data
        }

        val job3 = withContext(Dispatchers.IO) {
            val data = callbackFlow<List<Pair<String, String>>> {
                val priceList = mutableListOf<Pair<String, String>>()
                for (weights in job) {
                    databaseReference.child("Quantities").child(weights.itemsId!!)
                        .child(weights.itemWeight!!)
                        .addListenerForSingleValueEvent(object : ValueEventListener {
                            override fun onCancelled(p0: DatabaseError) {

                            }

                            override fun onDataChange(dataSnapshot: DataSnapshot) {
                                if (dataSnapshot.exists()) {
                                    val key = dataSnapshot.key
                                    val value = dataSnapshot.value as String
                                    val myPair = Pair(key!!, value)
                                    priceList.add(myPair)
                                    offer(priceList)
                                }
                            }
                        })
                }
                channel.close()
                awaitClose()
            }.single()
            data
    }
}

if(job2.isNotEmpty() && job3.isNotEmpty()){
          val myCartList = mutableListOf<Cart>()
          for((counter, cartItem) in job2.withIndex()){
              val quantityPair  = job3[counter]
              val cart = Cart(cartItem.id!!,cartItem.url!!,cartItem.name!!,quantityPair.first,quantityPair.second)
              myCartList.add(cart)
          }
          _cartData.value = myCartList
      }
  }

我用这种方法,但它不工作..

标签: androidmultithreadingfirebase-realtime-databasecoroutinekotlin-coroutines

解决方案


你在这里处理回调。到cartList返回的时候,onDataChanged还没有被调用,所以列表是空的。callbackFlow在这种情况下,我会使用使用 builder 构建的 Flow 。像这样的东西:

  • 流动方法

将此流添加为成员变量。

val myFlow = callbackFlow<List<Basket>> {
    databaseReference.child("Cart").child("Anonymous")
        .addListenerForSingleValueEvent(object : ValueEventListener {
            override fun onCancelled(p0: DatabaseError) {

            }

            override fun onDataChange(dataSnapshot: DataSnapshot) {
                if (dataSnapshot.exists()) {
                    val cartList = mutableListOf<Basket>()
                    for (data in dataSnapshot.children) {
                        val cart = data.getValue(Basket::class.java)
                        cartList.add(cart!!)  //I'm getting this thing
                    }
                    offer(cartList)
                    channel.close()
                }
            }
        })
    awaitClose()
}

在需要时使用调用此方法的 Flow:

fun getBasketList() = myScope.launch(Dispatchers.IO) {//Dispatchers.IO because I assume you're doing something meaningful with it inside async blocks
    val basketList = myFlow.single()
    val job2 = async {
        for(result in basketList){}
    }

    val job3 = async {
        for(result in basketList){}
    }
}

PS:不要忘记提出请求,否则您将无法从 Flow 中获得任何信息。


编辑 1

  • suspendCancellableCoroutine方法

将此方法添加到您的课程中:

suspend fun getBasketList(): List<Basket> = suspendCancellableCoroutine { continuation ->
    databaseReference.child("Cart").child("Anonymous")
        .addListenerForSingleValueEvent(object : ValueEventListener {
            override fun onCancelled(p0: DatabaseError) {
                continuation.resumeWithException(p0)
            }

            override fun onDataChange(dataSnapshot: DataSnapshot) {
                if (dataSnapshot.exists()) {
                    val cartList = mutableListOf<Basket>()
                    for (data in dataSnapshot.children) {
                        val cart = data.getValue(Basket::class.java)
                        cartList.add(cart!!)  //I'm getting this thing
                    }
                    continuation.resume(cartList)
                }
            }
        })

    //MAKE YOUR REQUEST RIGHT HERE
}

还要添加这个方法,它将调用前一个来启动请求:

fun requestBasketList() = myScope.launch(Dispatchers.IO) {
    val basketList = getBasketList()
    val job2 = async {
        for(result in basketList ){}
    }

    val job3 = async {
        for(result in basketList ){}
    }
}

编辑 2

如果您需要继续使用购物篮列表并对其中的每个项目提出其他请求,您可以执行以下操作:

val basketList = getBasketList().single()
val cartList = mutableListOf<Cart>()
val jobs = basketList.map {
    launch {
        val result = //MAKE YOUR FIREBASE REQUEST HERE
        val cart = // Work with 'result' and make your transformations here
        cartList.add(cart)
    }
}
jobs.joinAll()

推荐阅读