首页 > 解决方案 > 在 Kubernetes 上运行 Apache Ignite 的指南 - 服务器模式

问题描述

我的一个后端服务使用 Ignite on Server 模式,具有持久性,完全复制,在 Kubernetes 中运行。我尝试了网站中提供的文档以及此处的示例。应用程序 pod 启动,但应用程序实例不相互连接或相互之间没有复制数据。

部分错误

Caused by: java.io.FileNotFoundException: 

https://kubernetes.default.svc.cluster.local:443/api/v1/namespaces/default/endpoints/ignite
 at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source) ~[na:na]
 at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source) ~[na:na]
 at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(Unknown Source) ~[na:na]
 at org.apache.ignite.internal.kubernetes.connection.KubernetesServiceAddressResolver.getServiceAddresses(KubernetesServiceAddressResolver.java:109) ~[ignite-kubernetes-2.11.0.jar:2.11.0]
 ... 90 common frames omitted
2021-10-23 05:52:31.895 ERROR 1 --- [           main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi  : Failed to get registered addresses from IP finder (retrying every 2000ms; change 'reconnectDelay' to configure the frequency of retries) [maxTimeout=0]
org.apache.ignite.spi.IgniteSpiException: Failed to retrieve Ignite pods IP addresses.
 at org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder.getRegisteredAddresses(TcpDiscoveryKubernetesIpFinder.java:78) ~[ignite-kubernetes-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.registeredAddresses(TcpDiscoverySpi.java:2052) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.resolvedAddresses(TcpDiscoverySpi.java:1987) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.ServerImpl.sendJoinRequestMessage(ServerImpl.java:1293) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1121) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:473) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2207) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:278) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:980) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1985) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1331) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2141) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1787) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1172) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:668) ~[ignite-core-2.11.0.jar:2.11.0]
 at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:590) ~[ignite-core-2.11.0.jar:2.11.0]

和另一个

    2021-10-23 05:47:30.579 ERROR 1 --- [           main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi  : Failed to get registered addresses from IP finder (retrying every 2000ms; change 'reconnectDelay' to configure the frequency of retries) [maxTimeout=0]
org.apache.ignite.spi.IgniteSpiException: Failed to retrieve Ignite pods IP addresses.
    at org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder.getRegisteredAddresses(TcpDiscoveryKubernetesIpFinder.java:78) ~[ignite-kubernetes-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.registeredAddresses(TcpDiscoverySpi.java:2052) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.resolvedAddresses(TcpDiscoverySpi.java:1987) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.ServerImpl.sendJoinRequestMessage(ServerImpl.java:1293) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1121) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:473) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2207) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:278) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:980) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1985) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1331) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2141) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1787) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1172) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:668) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:590) ~[ignite-core-2.11.0.jar:2.11.0]
    at org.apache.ignite.Ignition.start(Ignition.java:328) ~[ignite-core-2.11.0.jar:2.11.0]

下面是基于 Java 的 Ignite 配置

package myapp;

import java.util.Collections;
import java.util.UUID;
import java.util.stream.Stream;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.extern.slf4j.Slf4j;

@Configuration
@Slf4j
public class IgniteConfig {

    @Bean
  public IgniteConfiguration igniteConfiguration(CacheConfiguration<String, State>[] cacheConfiguration,
        DataStorageConfiguration storageCfg, TcpDiscoverySpi discoverySpi) {
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setIgniteInstanceName("schedules-cache-instance-" + UUID.randomUUID().toString());
    cfg.setCacheConfiguration(cacheConfiguration);
    cfg.setDataStorageConfiguration(storageCfg);
    cfg.setClientMode(false);
    cfg.setDiscoverySpi(discoverySpi);
    return cfg;
}

    @Bean
    // creating set of caches before hand
    public CacheConfiguration<String, State>[] cacheConfig(IgniteProps igniteProps) {
        return Stream.of(Cache.values()).map(c -> {
            CacheConfiguration<String, Schedule> cacheCfg = new CacheConfiguration<String, Schedule>(c.getName());
            cacheCfg.setRebalanceDelay(0);
            cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
            cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
            cacheCfg.setCacheMode(CacheMode.REPLICATED);
            cacheCfg.setGroupName("schedules");
            return cacheCfg;
        }).toArray(c -> new CacheConfiguration[c]);
    }

    @Bean
    public TcpDiscoverySpi discovery(IgniteProps igniteProps) {
        TcpDiscoverySpi discoverySpi = null;
        if (igniteProps.isKubernetesDeployment()) {
            discoverySpi = kubernetesDiscovery(igniteProps);
        } else {
            discoverySpi = localDiscovery();
        }
        return discoverySpi;
    }

    private TcpDiscoverySpi kubernetesDiscovery(IgniteProps igniteProps) {
        log.info("++ creating k8s based discovery, {}", igniteProps);
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        TcpDiscoveryKubernetesIpFinder k8sIpFinder = new TcpDiscoveryKubernetesIpFinder();
        KubernetesConnectionConfiguration kubernetesConnectionConfiguration = new KubernetesConnectionConfiguration();
        kubernetesConnectionConfiguration.setNamespace("default");
        kubernetesConnectionConfiguration.setServiceName("state-manager");
        spi.setIpFinder(k8sIpFinder);
        log.info("++ creating k8s based discovery, {}", spi);
        return spi;
    }

    private TcpDiscoverySpi localDiscovery() {
        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
        discoverySpi.setLocalPort(48500);
        TcpDiscoveryVmIpFinder firstIpFinder = new TcpDiscoveryVmIpFinder();
        firstIpFinder.setAddresses(Collections.singletonList("127.0.0.1:48500..48520"));
        discoverySpi.setIpFinder(firstIpFinder);
        return discoverySpi;
    }

    @Bean
    public DataStorageConfiguration storageConfig(IgniteProps igniteProps) {
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        storageCfg.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
        storageCfg.setStoragePath(igniteProps.getStoragePath());
        if (igniteProps.isWALenabled()) {
            storageCfg.setWalMode(WALMode.FSYNC);
            storageCfg.setWalPath(igniteProps.getWalStoragePath());
        }
        return storageCfg;
    }

    @Bean
    public Ignite ignite(IgniteConfiguration igniteConfiguration) {
        Ignite ignite = Ignition.start(igniteConfiguration);
        ignite.cluster().baselineAutoAdjustTimeout(15000);
        IgniteEvents events = ignite.events(ignite.cluster().forCacheNodes("schedules"));
        events.localListen(new IgnitePredicate<DiscoveryEvent>() {

            @Override
            public boolean apply(DiscoveryEvent e) {
                ignite.cluster().baselineAutoAdjustEnabled(false);
                log.info(">> new node joined {} {} {} - {}", e.name(), e.eventNode().id(), e.message(), e.localOrder());
                ignite.cluster().state(ClusterState.ACTIVE);
                return true;
            }

        }, EventType.EVT_NODE_JOINED);
        events.localListen(new IgnitePredicate<DiscoveryEvent>() {

            @Override
            public boolean apply(DiscoveryEvent e) {
                ignite.cluster().baselineAutoAdjustEnabled(false);
                log.info(">> node left {} {} {} - {}", e.name(), e.eventNode().id(), e.message(), e.localOrder());
                ignite.cluster().state(ClusterState.ACTIVE);
                return true;
            }

        }, EventType.EVT_NODE_JOINED);
        ignite.cluster().baselineAutoAdjustEnabled(true);
        ignite.cluster().state(ClusterState.ACTIVE);
        return ignite;
    }

}

配置中缺少什么?

标签: ignite

解决方案


推荐阅读