google-cloud-dataflow - apache Beam 的 StartBundle 正在提交奇怪的错误
问题描述
我收到此错误
Caused by: java.lang.IllegalArgumentException:
com.orderly.rosters.transforms.RosterFileReader$RosterFileReaderFn, @StartBundle start(StartBundleContext), parameter of type StartBundleContext at index 0: StartBundleContext argument must have type DoFn<String, List<String>>.ProcessContext
在下面的代码
public abstract class OrderlyDoFn<INPUT, OUTPUT> extends DoFn<INPUT, OUTPUT> {
protected Logger log = LoggerFactory.getLogger(getClass());
private transient String projectId;
private transient Map headers;
@DoFn.StartBundle
public void start(DoFn.StartBundleContext ctx) {
OrderlyPipelineOptions options = (OrderlyPipelineOptions) ctx.getPipelineOptions();
headers = PlatformMagic.unmarshal(options.getPlatformMagic().get(), Map.class);
}
@DoFn.ProcessElement
public void processElement(@DoFn.Element INPUT elem, DoFn.OutputReceiver<OUTPUT> receiver) {
try {
RouterRequest routerReq = new RouterRequest();
routerReq.requestState = headers;
RequestContext ctx = new RequestContext(null, null, null, routerReq, null);
Current.setContext(ctx);
routerReq.requestState.put(DataflowClientFactory.PROJECT_KEY, projectId);
for (OrderlyHeaders header : OrderlyHeaders.values()) {
if (header.isLogged()) {
String value = (String) routerReq.requestState.get(header.getHeaderName());
MDC.put(header.getLoggerKey(), value);
}
}
processElementImpl(elem, receiver);
} catch (Throwable e) {
log.info("Exception processing OrderlyDoFn", e);
throw SneakyThrow.sneak(e);
} finally {
MDC.clear();
Current.setContext(null); //clear context
}
}
protected abstract void processElementImpl(INPUT elem, OutputReceiver<OUTPUT> receiver);
}
解决方案
哦,这解决了它
@DoFn.StartBundle
public void start(DoFn<INPUT, OUTPUT>.StartBundleContext ctx) {