首页 > 解决方案 > apache Beam 的 StartBundle 正在提交奇怪的错误

问题描述

我收到此错误

Caused by: java.lang.IllegalArgumentException: 
com.orderly.rosters.transforms.RosterFileReader$RosterFileReaderFn, @StartBundle start(StartBundleContext), parameter of type StartBundleContext at index 0: StartBundleContext argument must have type DoFn<String, List<String>>.ProcessContext

在下面的代码

public abstract class OrderlyDoFn<INPUT, OUTPUT> extends DoFn<INPUT, OUTPUT> {
    protected Logger log = LoggerFactory.getLogger(getClass());
    private transient String projectId;
    private transient Map headers;

    @DoFn.StartBundle
    public void start(DoFn.StartBundleContext ctx) {
        OrderlyPipelineOptions options = (OrderlyPipelineOptions) ctx.getPipelineOptions();
        headers = PlatformMagic.unmarshal(options.getPlatformMagic().get(), Map.class);
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element INPUT elem, DoFn.OutputReceiver<OUTPUT> receiver) {
        try {
            RouterRequest routerReq = new RouterRequest();
            routerReq.requestState = headers;
            RequestContext ctx = new RequestContext(null, null, null, routerReq, null);
            Current.setContext(ctx);
            routerReq.requestState.put(DataflowClientFactory.PROJECT_KEY, projectId);

            for (OrderlyHeaders header : OrderlyHeaders.values()) {
                if (header.isLogged()) {
                    String value = (String) routerReq.requestState.get(header.getHeaderName());
                    MDC.put(header.getLoggerKey(), value);
                }
            }

            processElementImpl(elem, receiver);
        } catch (Throwable e) {
            log.info("Exception processing OrderlyDoFn", e);
            throw SneakyThrow.sneak(e);
        } finally {
            MDC.clear();
            Current.setContext(null); //clear context
        }
    }

    protected abstract void processElementImpl(INPUT elem, OutputReceiver<OUTPUT> receiver);

}

标签: google-cloud-dataflowapache-beam

解决方案


哦,这解决了它

@DoFn.StartBundle
public void start(DoFn<INPUT, OUTPUT>.StartBundleContext ctx) {

推荐阅读