geode - Native Client - 执行连续查询时出现序列化异常
问题描述
我正在尝试使用 Apache Geode 设置一个简单的 Java <-> #C/.NET 概念证明,特别是使用 .NET 本机客户端测试连续查询功能。在 .NET 中使用常规查询可以正常工作,只有连续查询有问题。当我在连续查询对象上调用 Execute() 方法时遇到了我的问题。我得到的具体错误是
处理响应时收到未处理的消息类型 26,可能的序列化不匹配
我只是在缓存区域中存储简单的字符串,所以我有点惊讶我遇到了序列化问题。我已经尝试在两侧启用 PDX 序列化(并且在没有它的情况下运行),它似乎没有什么区别。有任何想法吗?
这是我的双方代码:
爪哇
启动服务器,放入一些数据,然后不断更新给定的缓存条目。
public class GeodePoc {
public static void main(String[] args) throws Exception {
ServerLauncher serverLauncher = new ServerLauncher.Builder().setMemberName("server1")
.setServerBindAddress("localhost").setServerPort(10334).set("start-locator", "localhost[20341]")
.set(ConfigurationProperties.LOG_LEVEL, "trace")
.setPdxReadSerialized(true)
.set(ConfigurationProperties.CACHE_XML_FILE, "cache.xml").build();
serverLauncher.start();
Cache c = CacheFactory.getAnyInstance();
Region<String, String> r = c.getRegion("example_region");
r.put("test1", "value1");
r.put("test2", "value2");
System.out.println("Cache server successfully started");
int i = 0;
while (true) {
r.put("test1", "value" + i);
System.out.println(r.get("test1"));
Thread.sleep(3000);
i++;
}
}
}
服务器缓存.xml
<?xml version="1.0" encoding="UTF-8"?>
<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
version="1.0">
<cache-server bind-address="localhost" port="40404"
max-connections="100" />
<pdx>
<pdx-serializer>
<class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
<parameter name="classes">
<string>java.lang.String</string>
</parameter>
</pdx-serializer>
</pdx>
<region name="example_region">
<region-attributes refid="REPLICATE" />
</region>
</cache>
.NET 客户端
public static void GeodeTest()
{
Properties<string, string> props = Properties<string, string>.Create();
props.Insert("cache-xml-file", "<path-to-cache.xml>");
CacheFactory cacheFactory = new CacheFactory(props)
.SetPdxReadSerialized(true).SetPdxIgnoreUnreadFields(true)
.Set("log-level", "info");
Cache cache = cacheFactory.Create();
cache.TypeRegistry.PdxSerializer = new ReflectionBasedAutoSerializer();
IRegion<string, string> region = cache.GetRegion<string, string>("example_region");
Console.WriteLine(region.Get("test2", null));
PoolManager pManager = cache.GetPoolManager();
Pool pool = pManager.Find("serverPool");
QueryService qs = pool.GetQueryService();
// Regular query example (works)
Query<string> q = qs.NewQuery<string>("select * from /example_region");
ISelectResults<string> results = q.Execute();
Console.WriteLine("Finished query");
foreach (string result in results)
{
Console.WriteLine(result);
}
// Continuous Query (does not work)
CqAttributesFactory<string, object> cqAttribsFactory = new CqAttributesFactory<string, object>();
ICqListener<string, object> listener = new CacheListener<string, object>();
cqAttribsFactory.InitCqListeners(new ICqListener<string, object>[] { listener });
cqAttribsFactory.AddCqListener(listener);
CqAttributes<string, object> cqAttribs = cqAttribsFactory.Create();
CqQuery<string, object> cquery = qs.NewCq<string, object>("select * from /example_region", cqAttribs, false);
Console.WriteLine(cquery.GetState());
Console.WriteLine(cquery.QueryString);
Console.WriteLine(">>> Cache query example started.");
cquery.Execute();
Console.WriteLine();
Console.WriteLine(">>> Example finished, press any key to exit ...");
Console.ReadKey();
}
.NET 缓存侦听器
public class CacheListener<TKey, TResult> : ICqListener<TKey, TResult>
{
public virtual void OnEvent(CqEvent<TKey, TResult> ev)
{
object val = ev.getNewValue() as object;
TKey key = ev.getKey();
CqOperation opType = ev.getQueryOperation();
string opStr = "DESTROY";
if (opType == CqOperation.OP_TYPE_CREATE)
opStr = "CREATE";
else if (opType == CqOperation.OP_TYPE_UPDATE)
opStr = "UPDATE";
Console.WriteLine("MyCqListener::OnEvent called with key {0}, op {1}.", key, opStr);
}
public virtual void OnError(CqEvent<TKey, TResult> ev)
{
Console.WriteLine("MyCqListener::OnError called");
}
public virtual void Close()
{
Console.WriteLine("MyCqListener::close called");
}
}
.NET 客户端缓存.xml
<client-cache
xmlns="http://geode.apache.org/schema/cache"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
version="1.0">
<pool name="serverPool" subscription-enabled="true">
<locator host="localhost" port="20341"/>
</pool>
<region name="example_region">
<region-attributes refid="CACHING_PROXY" pool-name="serverPool" />
</region>
</client-cache>
解决方案
这最终成为我的一个简单的疏忽。为了使连续查询起作用,您必须在 Java 端包含 geode-cq 依赖项。我没有这样做,这导致了异常。
推荐阅读
- amazon-web-services - 在创建新集群时,kOps 1.19 正在为“节点”创建 3 个不同的实例组,而不是单个 ig
- python - Python 的异步终止信号处理(signal.signal)如何工作?
- c# - 范围滑块的逻辑: min(min, max) 有效,但 max(min, max) 无效
- c# - VS Code 调试器无法为 .NET 控制台应用程序“步进”
- jena - Apache Jena Fuseki 是否支持集群或自动缩放?
- javascript - 试图找到所有正则表达式匹配的索引,但有些被遗漏了
- python - 如何在 read_excel 执行期间将时间值作为字符串获取?
- python - Pandas DataFrame 中布尔行的矢量化“真”值范围
- python - Pandas 拆分列表列表系列以查找字数/行
- java - 如何使用 couchbase Java SDK 获取使用 Arrays.asList(remove()) 删除的元素详细信息?