用于动态块大小的Spring Batch自定义完成策略

我们有一个批处理作业,可将本地化的国家名称(即国家名称翻译为不同语言)从外部复制到我们的数据库中。这个想法是在1个块中处理单个国家/地区的所有本地化的国家名称(即第一个块-

安道尔的所有翻译,下一个块-

阿联酋的所有翻译,等等)。我们使用JdbcCursorItemReader读取外部数据和一些oracle分析功能来提供该国家/地区可用的翻译总数:

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count

from EXT_COUNTRY_LNG c_lng

order by c_lng.countty_code, c_lng.language_code

因此,按块分割此输入看起来很简单:在读取了其中指定的确切行数后停止块,lng_count然后从下一个读取的行开始一个新的行,但实际上似乎并不那么简单:(

首先要尝试的是自定义完成政策。但是问题是,它无权访问由ItemReader读者阅读的最后一项-

您应该将其明确地放在阅读器中的上下文中,并将其返回到策略中。不喜欢它,因为它需要其他阅读器修改/添加阅读器侦听器。此外,我不喜欢将同一项目来回序列化/反序列化。而且我觉得JobContext/

StepContext不是存放此类数据的好地方。

还有RepeatContext一个看起来更适合存放此类数据的地方,但我无法 轻松 访问它……

所以最后我们得到这样的解决方案:

@Bean(name = "localizedCountryNamesStep")

@JobScope

public Step insertCountryStep(

final StepBuilderFactory stepBuilderFactory,

final MasterdataCountryNameReader countryNameReader,

final MasterdataCountryNameProcessor countryNameProcessor,

final MasterdataCountryNameWriter writer) {

/* Use the same fixed-commit policy, but update it's chunk size dynamically */

final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();

return stepBuilderFactory.get("localizedCountryNamesStep")

.<ExtCountryLng, LocalizedCountryName> chunk(policy)

.reader(countryNameReader)

.listener(new ItemReadListener<ExtCountryLng>() {

@Override

public void beforeRead() {

// do nothing

}

@Override

public void afterRead(final ExtCountryLng item) {

/* Update the cunk size after every read: consequent reads

inside the same country = same chunk do nothing since lngCount is always the same there */

policy.setChunkSize(item.getLngCount());

}

@Override

public void onReadError(final Exception ex) {

// do nothing

}

})

.processor(countryNameProcessor)

.writer(writer)

.faultTolerant()

.skip(RuntimeException.class)

.skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip

.retryLimit(0) // this solution disables only retry, but not recover

.build();

}

它可以正常工作,需要最少的代码更改,但是对我来说还是有点难看。所以我想知道,当所有必需的信息已经在时可用,是否有另一种优雅的方法可以在Spring

Batch中执行动态块大小ItemReader

回答:

最简单的方法是按国家划分您的步骤。这样,每个国家都可以迈出自己的一步,您还可以跨国家穿梭,​​以提高绩效。

如果需要一个读者,则可以包装委托PeekableItemReader并扩展SimpleCompletionPolicy以实现您的目标。

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {

private PeekableItemReader<? extends CountrySpecificItem> delegate;

private CountrySpecificItem currentReadItem = null;

@Override

public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {

currentReadItem = delegate.read();

return currentReadItem;

}

@Override

public RepeatContext start(final RepeatContext context) {

return new ComparisonPolicyTerminationContext(context);

}

protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {

public ComparisonPolicyTerminationContext(final RepeatContext context) {

super(context);

}

@Override

public boolean isComplete() {

final CountrySpecificItem nextReadItem = delegate.peek();

// logic to check if same country

if (currentReadItem.isSameCountry(nextReadItem)) {

return false;

}

return true;

}

}

}

然后根据您的上下文定义:

<batch:tasklet>

<batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />

</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">

<property name="delegate" ref="peekableReader" />

</bean>

<bean id="peekableReader" class="YourPeekableItemReader" />


仔细考虑您的问题,分区是最干净的方法。使用分区步骤,将从步骤执行上下文向每个ItemReader(确保scope="step")传递一个countryName。是的,您需要一个自定义Partitioner类来构建执行上下文图(每个国家/地区一个条目)和一个硬编码的提交间隔,该间隔足够大以容纳最大的工作单元,但是此后一切都非常简单,并且每个从属步骤仅是一个单独的步骤,对于可能遇到问题的任何国家,重新启动应该是一件轻而易举的事。

以上是 用于动态块大小的Spring Batch自定义完成策略 的全部内容, 来源链接: utcz.com/qa/404026.html

回到顶部