c++ - 将 mmap 内存用于开销非常低的循环缓冲区
问题描述
我有一个调试工具,为了注册其获取的数据,我使用了一个名为DiskPool
(代码如下)的数据结构。开始时,此数据结构mmap
包含一定数量的数据(由磁盘上的文件支持)。客户端可以通过简单的凹凸指针机制分配内存(使用std::atomic<size_t>
.
由于获取的数据量很大,我决定在一段时间内设置一个窗口,而不是注册和保留所有数据。为了实现这样的目的,我必须将磁盘池更改为循环缓冲区,但这不应该带来相当大的开销,因为这种开销会影响测量。
我想问你是否有人有任何想法?(例如,使用 STL 的原子接口)。
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <atomic>
#include <memory>
#include <signal.h>
#include <chrono>
#include <thread>
#define handle_error(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)
class DiskPool {
char* addr_; // Initialized by mmap()
size_t len_; // Given by the user as many as memory pages as needed
std::atomic<size_t> top_; // Offset from address_
int fd_;
public:
DiskPool(size_t l, const char* file) : len_(l), top_(0),fd_(-1)
{
struct stat st;
fd_= open(file, O_CREAT|O_RDWR, S_IREAD | S_IWRITE);
if (fd_ == -1)
handle_error("open");
if (ftruncate(fd_, len_* sysconf(_SC_PAGE_SIZE)) != 0)
handle_error("ftruncate() error");
else {
fstat(fd_, &st);
printf("the file has %ld bytes\n", (long) st.st_size);
}
addr_ = static_cast<char*>( mmap(NULL, (len_* sysconf(_SC_PAGE_SIZE)),
PROT_READ | PROT_WRITE, MAP_SHARED|MAP_NORESERVE, fd_,0));
if (addr_ == MAP_FAILED)
handle_error("mmap failed.");
}
~DiskPool()
{
close(fd_);
if( munmap(addr_, len_)< 0) {
handle_error("Could not unmap file");
exit(1);}
std::cout << "Successfully unmapped the file. " << std::endl;
}
void* allocate(size_t s)
{
size_t t = std::atomic_fetch_add(&top_, s);
return addr_+t;
}
void flush() {madvise(addr_, len_, MADV_DONTNEED);}
};
例如,我创建了示例代码,该代码使用此磁盘池在创建和销毁对象 ( AutomaticLifetimeCollector
) 时记录数据。
static const std::string RECORD_FILE = "Data.txt";
static const size_t DISK_POOL_NUMBER_OF_PAGES = 10000;
static std::shared_ptr<DiskPool> diskPool =
std::shared_ptr <DiskPool> (new DiskPool(DISK_POOL_NUMBER_OF_PAGES,RECORD_FILE.c_str()));
struct TaskRecord
{
uint64_t tid; // Thread id
uint64_t tag; // User-given identifier (“f1”)
uint64_t start_time; // nanoseconds
uint64_t stop_time;
uint64_t cpu_time;
TaskRecord(int depth, size_t tag, uint64_t start_time) :
tid(pthread_self()), tag(tag),
start_time(start_time), stop_time(0), cpu_time(0) {}
};
class AutomaticLifetimeCollector
{
TaskRecord* record_;
public:
AutomaticLifetimeCollector(size_t tag) :
record_(new(diskPool->allocate(sizeof(TaskRecord)))
TaskRecord(2, tag, (uint64_t)1000000004L))
{
}
~AutomaticLifetimeCollector() {
record_->stop_time = (uint64_t)1000000000L;
record_->cpu_time = (uint64_t)1000000002L;
}
};
inline void DelayMilSec(unsigned int pduration)
{
std::this_thread::sleep_until(std::chrono::system_clock::now() +
std::chrono::milliseconds(pduration));
}
std::atomic<bool> LoopsRunFlag {true};
void sigIntHappened(int signal)
{
std::cout<< "Application was terminated.";
LoopsRunFlag.store(false, std::memory_order_release);
}
int main()
{
signal(SIGINT, sigIntHappened);
unsigned int i = 0;
while(LoopsRunFlag)
{
AutomaticLifetimeCollector alc(i++);
DelayMilSec(2);
}
diskPool->flush();
return(0);
}
解决方案
因此,仅考虑为可变缓冲区分配可变大小的切片,我相信 Compare-And-Swap 循环应该可以工作。
这里的基本思想是读取一个值(这是原子的),对其进行一些计算,然后写入该值,如果它在读取后没有改变。如果它确实发生了变化(另一个线程/进程),则必须使用新值重做计算。
由于您有可变大小的对象,我认为实际上简单地将其切成 n 个数组元素是(i + 1) % n
行不通的,因为(i + item_len) % capacity
它会在缓冲区的结束和开始之间拆分分配,虽然这可能是正确且有效的,但我想想也许不是你想要的。所以这意味着一个条件,但我认为 CPU 应该可以很好地预测它。
#include <iostream>
#include <atomic>
std::atomic<size_t> next_index = 0;
const size_t len = 100; // small for demo purpose
size_t alloc(size_t required_size)
{
if (required_size > len) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = next_index.load();
do
{
auto space = len - i;
ret_index = required_size <= space ? i : 0; // Wrap if needed
new_index = ret_index + required_size;
} while (next_index.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return ret_index;
}
int main()
{
std::cout << alloc(4) << std::endl; // 0 - 3
std::cout << alloc(8) << std::endl; // 4 - 11
std::cout << alloc(32) << std::endl; // 12 - 43
std::cout << alloc(32) << std::endl; // 44 - 75
std::cout << alloc(32) << std::endl; // 0 - 31 (76 - 107 would overflow)
std::cout << alloc(32) << std::endl; // 32 - 63
std::cout << alloc(32) << std::endl; // 64 - 95
std::cout << alloc(32) << std::endl; // 0 - 31 (96 - 127 would overflow)
}
插入您的课程应该相当简单:
void* allocate(size_t s)
{
if (s > len_ * sysconf(_SC_PAGE_SIZE)) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = top_.load();
do
{
auto space = len_ * sysconf(_SC_PAGE_SIZE) - i;
ret_index = s <= space ? i : 0; // Wrap if needed
new_index = ret_index + s;
} while (top_.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return addr_+ ret_index;
}
len_ * sysconf(_SC_PAGE_SIZE)
在一些地方,所以可能是更有用的价值来存储len_
本身。
推荐阅读
- java - RecyclerView 未加载
- plugins - Gimp:如何批量调整多个尺寸的一张图像并导出它们?
- node.js - 带有 child_process NodeJS 的 JavaScript 事件源
- c# - 使用我的 WPF GUI 冻结的 Powershell 脚本无法找出原因
- python - AWS Canary Selenium 用户代理字符串
- python - 如何打印一个数组中相对于另一个数组中的值的值
- android - 无法安装 SDK 工具,完成按钮被禁用
- git - 如何修复 git 错误:RPC 失败;HTTP 520 卷曲 22?
- powershell - 导出 CSV 文件的格式与实际命令不同
- discord.py - 当我们不是服务器的所有者时,我们如何添加机器人?