首页 > 解决方案 > 使用 Parallel::ForkManager 并行执行命令

问题描述

我想知道我对以下脚本/逻辑的理解是否正确。

我有节点列表,我需要通过对服务器执行 SSH 来利用我拥有的服务器数量在每个节点上运行某个命令,这意味着该过程应该并行发生。

我有node_list.txt包含节点列表的文件:

node1
node2
.
.
node49
node50

我已经定义了一个阵列中的服务器数量,我应该在其中执行 SSH 并通过在可用服务器中将它们分成相等数量的部分(称为)@hosts来对每个节点执行命令。node_file.txt$node_list_X.txt

一旦我有了这些文件(node_list_1.txt, node_list_2.txt, node_list_3.txt, node_list_4.txt),我将登录到每个已经定义的服务器,并通过node_list_X.txt并行传递文件在每个主机上执行某些命令。

要并行执行此操作,我正在使用Parallel::ForkManagerPerl 模块。

因此,让我们在每个主机中说 -

192.168.0.1 -> node_list_1.txt (13 nodes)
192.168.0.2 -> node_list_2.txt (13 nodes)
192.168.0.3 -> node_list_3.txt (12 nodes)
192.168.0.4 -> node_list_4.txt (12 nodes)

将并行运行。

脚本如下:

...
my @hosts = ("192.168.0.1", "192.168.0.2", "192.168.0.3","192.168.0.4");

open(my $node_fh, '<', $node_file)
        or die "can't open $node_file: $!";

my @lines =  <$node_fh>;

my %Files;

my $num_buckets = scalar @hosts;

my $per_bucket = int( @lines / $num_buckets );
my $num_extras =      @lines % $num_buckets;
my $path = "/home/user/vinod/test/";

for my $bucket_num (0..$num_buckets-1) {
   my $num_lines = $per_bucket;
   if ($num_extras) {
      ++$num_lines;
      --$num_extras;
   }

   last if($num_lines == 0);
   my $qfn = $path."node_list_${bucket_num}.txt";
   open(my $fh, '>', $qfn)
      or die("Can't create \"$qfn\": $!\n");

   $fh->print(splice(@lines, 0, $num_lines));
   $Files{$bucket_num} = $qfn;
}
print Dumper(\%Files);

my $command = #"defining my command here";

my $pm = Parallel::ForkManager->new(5);
my $ssh;

DATA_LOOP:
foreach my $n (0..$num_buckets-1) {
    if( exists $Files{$n} ) {
        my $pid = $pm->start and next DATA_LOOP;

        $command_to_execute = $command." ".$Files{$n};
        $ssh = SSH_Connection( $hosts[$n-1], "user", "password" );
        $result = $ssh->capture($command_to_execute);
      
        $pm->finish;       
    }
}
$pm->wait_all_children;
undef $ssh;

#SSH Connect
sub SSH_Connection {
    my ( $host, $user, $passwd ) = @_;
    my $ssh = Net::OpenSSH->new($host,
                                user => $user,
                                password => $passwd,
                                master_opts => [-o => "StrictHostKeyChecking=no"]
    );
    $ssh->error and die "Couldn't establish SSH connection: ". $ssh->error;
    return $ssh;
}

在这里一切正常。

当我定义$pm对象时,并行进程设置为 5。

my $pm = Parallel::ForkManager->new(5);

这是否意味着在特定服务器(例如:192.168.0.1)中一次应该运行 5 个并行进程。意味着它应该从node_list_1.txt(13 个)文件中获取 5 个节点并执行命令?

我的理解正确吗?如果不是,那么在每个服务器中使用多线程并行运行命令的可能解决方案是什么?

标签: perlparallel-processingmultitasking

解决方案


这是否意味着在特定服务器(例如:192.168.0.1)中一次应该运行 5 个并行进程。

不。P::FM 对服务器一无所知。它管理进程,如果其中 5 个进程仍在执行,则->new(5)意味着->start将等待它创建的进程之一完成,然后再创建一个新进程。

在每个服务器中使用多线程并行运行命令的可能解决方案是什么?

假设您通常指的是多任务而不是专门的多线程(因为您没有使用线程),为每个主机创建一个进程可以按如下方式完成:

my %children;
my $error = 0;
for my $host (@hosts) {
    my $pid = fork();
    if (!defined($pid)) {
       warn("Can't execute on $host: Can't fork: $!\n");
       next;
    }

    if ($pid) {
       ++$children{$pid};
       next;
    }

    if (!eval {
       do_it($host);
       return 1;  # No exception
    }) {
       warn("Error executing commands on $host: $@");
    }
}

while (%children) {
   ( my $pid = wait() ) >= 0
      or die("Can't wait: $!\n");

   delete($children{$pid});   
}

推荐阅读