首页 > 解决方案 > 多线程读取导致 Cassandra 会话数据损坏

问题描述

我对 cassandra api 和选择查询有疑问。在单线程上下文中它工作得很好。但是,当多个线程使用同一个对象并调用该函数时,即使只有两个线程,cassandra 也会返回带有错误数据的未来(有时似乎被其他选择查询的数据覆盖,有时它只是垃圾)。我有一个单例对象处理我对 cassandra 的调用,还有一个字节缓冲区结构来保存返回的数据。我已经确定,在单个线程上,一切都按预期工作,但是当我使用相同的对象添加更多线程并调用相同的函数时,错误的数据会被引入从 cassandra 返回的数据中。

值大小约为 1kb,键为 32 字节。

您可以看到被注释掉的互斥锁包围的 2 行。如果未注释,这可以防止不正确的数据问题,但也会抵消增加线程数带来的任何性能提升。

此外,损坏查询的百分比约为 33%。

cassandra api 应该能够处理多个线程和会话连接而不会出现任何问题(根据http://datastax.github.io/cpp-driver/topics/),那么为什么我要取回坏数据?

我正在使用 Centos7、c++14 和 cassandra api 2.15 和 cassandra 3.11.4

#include <cassert> 
#include "DataObj.h"//contains the rest of my includes

//byteBuf has two members: a const uint8_t pointer, and a uint32_t variable for declaring the size
DataObj::byteBuf  DataObj::threadGet(DataObj::byteBuf key)
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture* future = NULL;                        
  string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
  const char* query = (const char*)temp.c_str();
  statement = cass_statement_new(query, 1);
 //I am also having issues with prepared statements not working properly, 
//but I believe it is not directly related to this question. This setup was working fine on 1 thread    
  cass_statement_bind_bytes(statement, 0, key.data, key.size);
  //rw_mut.lock();   
  future = cass_session_execute(m_session, statement);// m_session is the cassandra session of the object
  cass_future_wait(future);
  //rw_mut.unlock();
//The two statements above, when gated by the mutex, do not produce errors when multithreading,
// but also do not gain any performance.
// When not gated, works fine on a single thread, but corrupts the return data on 2 or more threads

  const  uint8_t* st=nullptr;
  size_t len=0;                 
  rc = cass_future_error_code(future);
  if (rc != CASS_OK)
  {
    //error handling...
  }
  else
  {
    const CassResult* result = cass_future_get_result(future);
    CassIterator* iterator = cass_iterator_from_result(result);                     
         if (cass_iterator_next(iterator))
          {                       
             const CassRow* row = cass_iterator_get_row(iterator);
             cass_value_get_bytes(cass_row_get_column_by_name(row, "value"), &st,&len);
          }                         
    cass_result_free(result);
    cass_iterator_free(iterator);
  }               
  DataObj::byteBuf  res((uint8_t *) st, len);
  //was able to use gdb here and confirm that the data is corrupt

  cass_future_free(future);      

  return res;
}  

标签: c++multithreadingcassandradatastax

解决方案


看起来您在从值指针复制到byteBuf. 在单线程版本中,您可能会很幸运,取消引用的内存仍然完好无损。多线程,你很可能会覆盖。


推荐阅读