首页 > 解决方案 > 如何使用 Java 远程方法调用在分布式应用程序中实现任务包?

问题描述

服务器代码

    package server;
 
import java.util.List;
 
public interface PrimeTask {
    List<Long> getListofPrimes(Long n);
    Boolean isPrime(Long n);
}

    package server;
 
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.List;
 
public interface PrimeCompute extends Remote {
    List<Long> getListofPrimes(PrimeTask t, Long n) throws RemoteException;
    Boolean isPrime(PrimeTask t, Long n) throws RemoteException;
}

    package server;
 
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.List;
 
public class PrimeComputeEngine implements PrimeCompute{
    public List<Long> getListofPrimes(PrimeTask t, Long n) throws RemoteException
    {
        return t.getListofPrimes(n);
    }
 
    public Boolean isPrime(PrimeTask t, Long n) throws RemoteException
    {
        return t.isPrime(n);
    }
 
    public static void main(String[] args) {
        try
        {
            String name = "PrimeCompute";
            PrimeCompute engine = new PrimeComputeEngine();
            PrimeCompute stub =
                    (PrimeCompute) UnicastRemoteObject.exportObject(engine, 0);
            Registry registry = LocateRegistry.getRegistry();
            registry.rebind(name, stub);
            System.out.println("ComputeEngine bound");
        } catch (Exception e) {
            System.err.println("ComputeEngine exception:");
            e.printStackTrace();
        }
    }
}

CLIENT CODE

    package client;
 
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
 
import server.PrimeTask;
 
class PrimeObject implements PrimeTask, Serializable
{
    private static final long serialVersionUID = 8954L;
 
    public PrimeObject()
    {
 
    }
 
    @Override
    public List<Long> getListofPrimes(Long n) {
        List<Long> temp = new ArrayList<Long>();
 
        for (long i = 1; i <= n; i++)
        {
            if (isPrime(i))
                temp.add(i);
        }
        return temp;
    }
 
    @Override
    public Boolean isPrime(Long n) {
        if (n == 1)
            return false;
        else if (n < 4)
            return true;
        else if (n % 2 == 0)
            return false;
        else if (n < 9)
            return true;
        else if (n % 3 == 0)
            return false;
        else
        {
            int r = (int)Math.floor(Math.sqrt(n));
            int f = 5;
            while (f <= r)
            {
                if (n % f == 0)
                    return false;
                if (n % (f + 2) == 0)
                    return false;
                f = f + 6;
            }
        }
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package client;
 
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.List;
 
import server.PrimeCompute;
import server.PrimeTask;
 
public class PrimeNumberUtils {
    public static void main(String[] args) {
        try {
            String name = "PrimeCompute";
            Registry registry = LocateRegistry.getRegistry(args[0]);
            PrimeCompute prime = (PrimeCompute) registry.lookup(name);
            PrimeTask o = new PrimeObject();
            Long l = 1000000L;
            List<Long> p = prime.getListofPrimes(o, l);
            for (Long n : p) {
                System.out.println(n);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

我编写了一个简单的 Java 客户端-服务器程序来生成素数。我想给它添加任务包功能,但不知道怎么做。问题描述如下:

- 任务包是一个对象,用作 Pairs 的存储库。一对可能包含执行任务所需的任务描述或数据。Pair 由两个字符串组成:键和值。键是值的任何唯一标识符,例如任务名称或编号。该值是实际的任务描述或数据。

-任务包上可以进行三种操作:添加一对。取出一对。确定一对的值而不删除它。

- 可以使用以下三种方法来执行这些操作: pairOut(key, value) – 将 Pair (key/value) 添加到 Task Bag 并允许调用者立即继续处理。pairIn(key) à value – 导致任务包中与键匹配的某些对从任务包中撤出。Pair 的值部分被返回,客户端进程继续。如果没有匹配的 Pair 可用,则向调用者返回一个空字符串。如果有多个匹配的 Pairs 可用,则任意选择一个。readPair(key) à value – 与 pairIn 相同,只是 Pair 不会从任务包中删除。

- Worker 进程可以在与 Task Master 相同的计算机上启动,也可以在不同的计算机上启动。可以处理任务包中任务的工人数量没有限制。每个 Worker 将从 Task Bag 中提取数据,指示应该完成的工作以及 Task Bag 中进行计算所需的数据。结果也将被发送回任务包。当有更多任务可用时,任务包会通知工作人员。这要求工人在启动时向任务包注册,在关闭时取消注册。

- 每个工人使用一个唯一的名称来标识自己。默认情况下,此名称是使用计算机的主机名获得的。用户还可以在启动 Worker 时使用 -n 选项手动指定名称(当 2 个 worker 在同一台计算机上执行时很有用)。

-为了让工人开始工作,任务袋中必须有一个可用的任务对。Task Master 负责将第一个 Task Pair 放入 Task Bag。当 Task Bag 收到一个 Task Pair 时,它会立即向所有空闲的 Worker 发送一个新任务可用的通知。第一个响应的空闲 Worker 将删除任务对并将其替换为下一个任务对,指示下一个需要完成的任务。如果另一个空闲的 Worker 在第一个 Worker 替换它之前也请求了 Task Pair,它将无法读取 Task Pair 并等待来自 Task Bag 的另一个 Task Pair 通知。这样一来,任务包中的任务对就不会超过一个。当没有更多任务要完成时(所有矩阵计算都已完成),最后一个 Worker 不会替换 Task Pair。

- 每个任务对的关键是“下一个任务”。对于矩阵乘法,任务对值将由一个值组成,该值指示需要计算的结果矩阵的行和列。无需强制对这些任务对进行排序,因此实施的系统将任务从左到右、从上到下放置。

标签: javaparallel-processingrmi

解决方案


推荐阅读