首页 > 解决方案 > JpaRepository 在保存到数据库时跳过了一些行

问题描述

我正在制作一种解析大文件并将结果保存在数据库中的方法。

该文件有 12_500_000 行长,但最后平均保存 11_400_000 行。

我将它分成 10_000 个日志文件,并在当时的 8 个线程中解析它们。

正在节省的服务:

@Service
public class RequestService implements InterfaceRequestService {

    private final IRequestRepository repository;

    public RequestService(IRequestRepository repository) {
        this.repository = repository;
    }

    @Transactional
    @Override
    public void saveAll(List<Request> requests) {

        for (Request r: requests) {
            repository.save(r);
        }

    }
}

和界面:

public interface InterfaceRequestService {

    void saveAll(List<Request> requests);
}

在线程中进行保存的类:

public class Importer implements Runnable{

    private final IRequestService service;

    int fileNumber;

    public Importer(IRequestService service, int fileNumber) {
        this.service = service;
        this.fileNumber = fileNumber;
    }

    @Override
    public void run() {
        try {
            service.saveAll(getRequestListFromFile("segment_directory/Log_segment_"+fileNumber+".txt"));
            try {
                Thread.sleep(10);
            }catch (Exception e){
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

以及从文件中获取实体列表的方法:

 public List<Request> getRequestListFromFile(String filename) throws IOException {

    BufferedReader reader = new BufferedReader(new FileReader(filename));

    List<Request> logs = new ArrayList<>();

    String line;


    while((line = reader.readLine())!=null){
        logs.add(getRequestEntityFromLog(line));
    }
    reader.close();

    return logs;
 }

最后,运行所有这些的方法:

public void scheduledDataSave() throws InterruptedException {
    int availableCores = Runtime.getRuntime().availableProcessors();
    String directory = "segment_directory";

    int filesInDirectory = Objects.requireNonNull(new File(directory).list()).length;

    ExecutorService executorService = Executors.newFixedThreadPool(availableCores);

    for ( int i = 1 ; i <= filesInDirectory ; i++ ) {
        executorService.submit( new Importer( service, i ) );
        if(  i % 8  == 0 ) {
            System.out.println("Sleeping");
            Thread.sleep( Duration.ofSeconds( 10 ).toMillis() ) ;
        }
    }

    executorService.shutdown();
    executorService.awaitTermination(20, TimeUnit.SECONDS);
}

有什么理由跳过某些行吗?

我检查了一下

更新:

分别运行文件,但它仍然没有保存所有行。

带有 UncaughtExceptionHandler 的代码

数据库行数

标签: javamultithreadinghibernatejpa

解决方案


推荐阅读