java - 为什么这些线程读取整个文件而不是 maxitemcounter?
问题描述
我正在使用 Spring Batch 多线程进行处理。我的进程收到一个大的 txt 文件(> 100k 行)。我希望每个线程处理文件的 X 行并使用信息进行 X 处理以获得时间。
我正在使用 FlatFileItemRead 类将初始行和最后一行传递给每个线程。我正在用 19k 行、3 个和 4 个线程进行测试,我看不出为什么第一个线程在正确的行开始和停止,但其余线程从正确的行开始但没有以正确的行结束,请阅读文件直到结束。
我已经阅读了很多关于这个主题和 FlatFileItemReader 类的问题,它不是线程安全的。我认为我的问题就在这里,但是我将 saveState 传递给 false 并指示每个线程的行。(见此)。
这些是 XML 配置文件和 RangePartition 文件(步骤 setTiempoInicial 和 stTiempoFinal 仅打印初始时间和总时间):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<batch:job id="superTxtTest">
<batch:step id="setTiempoInicial" next="validacionDePers">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tiempoInicialReader" writer="tiempoInicialWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="promotionListener" />
</batch:listeners>
</batch:step>
<batch:step id="validacionDePers" next="setTiempoFinal">
<partition step="validacionDePersSlave" partitioner="rangePartitioner">
<handler grid-size="3" task-executor="taskExecutor" />
</partition>
</batch:step>
<batch:step id="setTiempoFinal">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tiempoFinalReader" writer="tiempoFinalWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<batch:step id="validacionDePersSlave">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tratamientoPersReader" writer="validacionPersWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<property name="keys" value="tiempoInicial" />
</bean>
<bean id="validacionPersWriter"
class="com.testpartitionfile.batch.writer.ValidacionPersWriter"
scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<bean id="rangePartitioner"
class="com.testpartitionfile.batch.partitioner.RangePartitioner" />
<bean id="tratamientoPersReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource"
value="file:C:\Users\user\Desktop\testFile.txt" />
<property name="encoding" value="utf8" />
<property name="lineMapper" ref="ficheroPersMapper" />
<property name="linesToSkip" value="#{stepExecutionContext[fromId]}" />
<property name="maxItemCount" value="#{stepExecutionContext[toId]}" />
<property name="saveState" value="false" />
</bean>
<bean id="ficheroPersMapper"
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="fieldSetMapper" ref="ficheroPersSetMapper" />
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=";" />
<property name="names"
value="a,b,c,d,e,f,g,h,i,j" />
</bean>
</property>
</bean>
<bean id="ficheroPersSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="fichero" />
</bean>
<bean id="fichero" class="com.testpartitionfile.batch.dto.Fichero"
scope="prototype" />
</beans>
范围分区器:
public class RangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int range = 1;
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream("C:\\Users\\user\\Desktop\\testFile.txt"));
byte[] c = new byte[1024];
int count = 0;
int readChars = 0;
while ((readChars = is.read(c)) != -1) {
for (int i = 0; i < readChars; ++i) {
if (c[i] == '\n') {
++count;
}
}
}
if (count != 0) {
range = (count/gridSize);
}
} catch (Exception e){
try {
is.close();
} catch (Exception e1) {
}
}
int fromId = 1;
int toId = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();
System.out.println("\nHilo : " + i);
System.out.println("Index Inicial : " + fromId);
System.out.println("Index Final : " + toId + "\n");
value.putInt("fromId", fromId - 1);
value.putInt("toId", toId);
// give each thread a name, thread 1,2,3
value.putString("name", "Hilo " + i);
result.put("partition" + i, value);
fromId = toId + 1;
toId += range;
if (i == gridSize - 1) toId = toId + 1;
}
return result;
}
}
具有 3 个线程的初始日志:
Thread: 1
Initial Index: 1
Final Index: 6333
Thread: 2
Index Initial: 6334
Final Index: 12666
Thread: 3
Index Initial: 12667
Final Index: 19000
情况1:
线程 1 读取到 6333。线程 2 读取到 19000,而不是读取到 12666,线程 3 读取到 19000。
案例二:
使用 4 个线程时,线程 2 和 3 的读数高达 19000。
为什么案例 1 中的线程 2 和案例 2 中的线程 2 和 3 从初始行(正确)读取到结尾(不正确)?
更新 25/07/2021
问题在:https ://github.com/spring-projects/spring-batch/issues/805
解决方案
推荐阅读
- python - 试图用剩余的代数项来评估积分
- android - 给定的工件包含一个带有包引用“android.support.design.widget”的字符串文字,无法安全地重写
- angular - 如何在 Angular / Jasmine 测试中模拟 Injector 实例?
- ssl-certificate - 如何将委托证书包转换为 pem 以终止 haproxy ssl
- ios - iphone主题处于黑暗模式时导航栏样式更改
- jquery - 带有动态js的jquery .load()问题
- c# - 如何使用 C# Kubernetes API 向 pod 发送 REST 请求
- reactjs - React.如何通过 jest 检查 redux?
- c# - 按两个条件(日期和车辆编号)分组计数数据库记录并在索引页面中显示错误
- django - Django 动态提供静态文件。(块加载错误)