首页 > 解决方案 > @Synchronized 方法中的 Kotlin IllegalMonitorStateException

问题描述

我试图在 Kotlin 中重现一个老式的解决方案,以解决具有多线程和共享内存空间的经典消费者-生产者问题。在Java中,我会使用同步方法来访问共享空间。然而,在Kotlin中,带注释的方法似乎@Synchronized是 throwing IllegalMonitorStateException我期待带注释的方法的行为应该与它们在Java中的行为完全相同,但似乎并非如此。我用函数解决了这个问题,synchronized(){}但我仍然对它@Synchronized不起作用感到困惑。这是为什么?

在下面的代码中,Producer 通过增加内部的计数器(Long)来“生成”一个新值SynchronizedBox,Consumer 读取该值,然后将其打印到控制台。

不起作用的 Kotlin 消息框

package concurrency.producer_consumer

class MessageBox(var message: Long = 0): SynchronizedBox {
    private val lock = Object()
    private var empty = true

    @Synchronized
    override fun syncIncrement() {
        while (!empty) {
            lock.wait()
        }

        message++
        empty = false
        lock.notifyAll()
    }

    @Synchronized
    override fun readValue(): Long {
          while (empty) {
              lock.wait()
          }

          val readValue = message
          empty = true
          lock.notifyAll()

          return readValue
    }
}

有效的 Java 变体:

package concurrency.producer_consumer;

public class JBox implements SynchronizedBox {
    private long value = 0;
    private boolean empty = true;

    @Override
    public synchronized void syncIncrement() {
        while (!empty) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        value++;
        empty = false;
        notifyAll();
    }

    @Override
    public synchronized long readValue() {
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }

        empty = true;
        return value;
    }
}

实际工作的 Kotlin 版本:

package concurrency.producer_consumer

class MessageBox(var message: Long = 0): SynchronizedBox {
    private val lock = Object()
    private var empty = true

    override fun syncIncrement() {
        synchronized(lock) {
            while (!empty) {
                lock.wait()
            }

            message++
            empty = false
            lock.notifyAll()
        }
    }

    override fun readValue(): Long {
        synchronized(lock) {
            while (empty) {
                lock.wait()
            }

            empty = true
            lock.notifyAll()
            return message
        }
    }
}

其余代码:

消费者:包 concurrency.producer_consumer

class Consumer(private val messageBox: SynchronizedBox): Runnable {

    override fun run() {
        println("consumer thread: ${Thread.currentThread().id}: started")

        while (true) {
            println("consumer: ${messageBox.readValue()}")
            Thread.sleep(1_000)
        }
    }
}

制片人

class Producer(private val messageBox: SynchronizedBox): Runnable {

    override fun run() {
        println("producer thread: ${Thread.currentThread().id}: started")

        while (true) {
            messageBox.syncIncrement()
            Thread.sleep(1_000)
        }
    }
}

界面

package concurrency.producer_consumer

interface SynchronizedBox {
    fun syncIncrement()
    fun readValue(): Long
}

启动器

package concurrency.producer_consumer

fun main() {
    val box: SynchronizedBox = MessageBox()
    val producer1 = Producer(box)
    val consumer = Consumer(box)

    val threadP1 = Thread(producer1)
    val threadC = Thread(consumer)

    threadP1.start()
    threadC.start()
}

标签: javakotlinconcurrency

解决方案


我有类似的问题,所以我没有使用@Synchronized关键字进行注释,而是更改为如下所示的同步调用,它对我有用。

private fun produce() = synchronized(lock){
   ...
}

要查看的Kotlin代码示例:

fun main() {
    val producer = Producer()
    producer.name = "PRODUCER-THREAD"
    producer.start()
    val consumer = Consumer(producer)
    consumer.name = "CONSUMER-THREAD"
    consumer.start()
}

class Consumer(private val producer: Producer) : Thread() {
    override fun run() {
        try {
            while (true) {
                val data = producer.consume()
                println("$data consumed by: ${currentThread().name}")
            }
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
}

class Producer : Thread() {
    override fun run() {
        try {
            while (true) produce()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    private fun produce() = synchronized(lock) {
        while (messages.size == MAX_SIZE) lock.wait()

        val data = LocalDateTime.now().toString()
        messages.add(data)
        println("Data produced")
        lock.notify()
    }

    fun consume(): String = synchronized(lock) {
        lock.notify()
        while (messages.isEmpty()) lock.wait()

        val data = messages[0]
        println("Data consumed as: $data")
        messages.remove(data)
        return data
    }

    companion object {
        const val MAX_SIZE = 3
        val lock = Object()
        val messages = arrayListOf<String>()
    }
}

推荐阅读