// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future) /** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream DataStream<String> stream = ...; // apply the async I/O transformation DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); 本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation;AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行 AsyncFunction flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java /** * A function to trigger Async I/O operation. * * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done, * the result can be collected by calling {@link ResultFuture#complete}. For each async * operation, its context is stored in the operator immediately after invoking * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full. * * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data. * An error can also be propagate to the async IO operator by * {@link ResultFuture#completeExceptionally(Throwable)}. * * <p>Callback www.michenggw.com example usage: * * <pre>{@code * public class HBaseAsyncFunc implements AsyncFunction<String, String> { * * public void asyncInvoke(String row, ResultFuture<String> result) throws Exception { * HBaseCallback cb = new HBaseCallback(result); * Get get = new Get(Bytes.toBytes(row)); * hbase.asyncGet(get, cb); * } * } * }</pre> * * <p>Future example www.mcyllpt.com usage: * * <pre>{@code * public class HBaseAsyncFunc implements AsyncFunction<String, String> { * * public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception { * Get get = new Get(Bytes.toBytes(row)); * ListenableFuture<Result>www.gcyl158.com future = hbase.asyncGet(get); * Futures.addCallback(future, new FutureCallback<Result>() { * public void onSuccess(Result result) { * List<String> ret = process(result); * result.complete(ret); * } * public void onFailure(Throwable thrown) { * result.completeExceptionally(thrown); * } * }); * } * } * }</pre> * * @param <IN> The type of the www.haitianguo.cn input elements. * @param <OUT> The type of the returned elements. */ @PublicEvolving public interface AsyncFunction<IN, OUT> extends Function, Serializable { /** * Trigger async operation for each stream input. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data * @exception Exception in case of a user code error. An exception will make the task fail and * trigger fail-over process. */ void asyncInvoke(IN input, ResultFuture<www.fenghuang1999.com> resultFuture) throws Exception; /** * {@link AsyncFunction#asyncInvoke} timeout occurred. * By default, the result future is exceptionally completed with a timeout exception. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data */ default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception { resultFuture.completeExceptionally( new TimeoutException("Async function call has timed out.")); } } AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture RichAsyncFunction flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @PublicEvolving public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> { private static final long serialVersionUID = 3858030061138121840L; @Override public void setRuntimeContext(RuntimeContext runtimeContext) { Preconditions.checkNotNull(runtimeContext); if (runtimeContext instanceof IterationRuntimeContext) { super.setRuntimeContext( new RichAsyncFunctionIterationRuntimeContext( (IterationRuntimeContext) runtimeContext)); } else { super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext)); } } @Override public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception; //...... } RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装 RichAsyncFunctionRuntimeContext flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java /** * A wrapper class for async function's {@link RuntimeContext}. The async function runtime * context only supports basic operations which are thread safe. Consequently, state access, * accumulators, broadcast variables and the distributed cache are disabled. */ private static class RichAsyncFunctionRuntimeContext implements RuntimeContext { private final RuntimeContext runtimeContext; RichAsyncFunctionRuntimeContext(RuntimeContext context) { runtimeContext = Preconditions.checkNotNull(context); } @Override public String getTaskName() { return runtimeContext.getTaskName(); } @Override public MetricGroup getMetricGroup() { return runtimeContext.getMetricGroup(); } @Override public int getNumberOfParallelSubtasks() { return runtimeContext.getNumberOfParallelSubtasks(); } @Override public int getMaxNumberOfParallelSubtasks() { return runtimeContext.getMaxNumberOfParallelSubtasks(); } @Override public int getIndexOfThisSubtask() { return runtimeContext.getIndexOfThisSubtask(); } @Override public int getAttemptNumber() { return runtimeContext.getAttemptNumber(); } @Override public String getTaskNameWithSubtasks() { return runtimeContext.getTaskNameWithSubtasks(); } @Override public ExecutionConfig getExecutionConfig() { return runtimeContext.getExecutionConfig(); } @Override public ClassLoader getUserCodeClassLoader() { return runtimeContext.getUserCodeClassLoader(); } // ----------------------------------------------------------------------------------- // Unsupported operations // ----------------------------------------------------------------------------------- @Override public DistributedCache getDistributedCache() { throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions."); } @Override public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { throw new UnsupportedOperationException("State is not supported in rich async functions."); } @Override public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { throw new UnsupportedOperationException("Accumulators are not supported in rich async functions."); } @Override public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { throw new UnsupportedOperationException("Accumulators are not supported in rich async functions."); } @Override public Map<String, Accumulator<?, ?>> getAllAccumulators() { throw new UnsupportedOperationException("Accumulators are not supported in rich async functions."); } @Override public IntCounter getIntCounter(String name) { throw new UnsupportedOperationException("Int counters are not supported in rich async functions."); } @Override public LongCounter getLongCounter(String name) { throw new UnsupportedOperationException("Long counters are not supported in rich async functions."); } @Override public DoubleCounter getDoubleCounter(String name) { throw new UnsupportedOperationException("Long counters are not supported in rich async functions."); } @Override public Histogram getHistogram(String name) { throw new UnsupportedOperationException("Histograms are not supported in rich async functions."); } @Override public boolean hasBroadcastVariable(String name) { throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions."); } @Override public <RT> List<RT> getBroadcastVariable(String name) { throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions."); } @Override public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) { throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions."); } } RichAsyncFunctionRuntimeContext实现了RuntimeContext接口,它将一些方法代理给RuntimeContext,其余的Unsupported的方法都覆盖抛出UnsupportedOperationException RichAsyncFunctionIterationRuntimeContext flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext { private final IterationRuntimeContext iterationRuntimeContext; RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) { super(iterationRuntimeContext); this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext); } @Override public int getSuperstepNumber() { return iterationRuntimeContext.getSuperstepNumber(); } // ----------------------------------------------------------------------------------- // Unsupported operations // ----------------------------------------------------------------------------------- @Override public <T extends Aggregator<?>> T getIterationAggregator(String name) { throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions."); } @Override public <T extends Value> T getPreviousIterationAggregate(String name) { throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions."); } } RichAsyncFunctionIterationRuntimeContext继承了RichAsyncFunctionRuntimeContext,实现了IterationRuntimeContext接口,它将getSuperstepNumber方法交由IterationRuntimeContext处理,然后覆盖getIterationAggregator、getPreviousIterationAggregate方法抛出UnsupportedOperationException AsyncDataStream flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @PublicEvolving public class AsyncDataStream { /** * Output mode for asynchronous operations. */ public enum OutputMode { ORDERED, UNORDERED } private static final int DEFAULT_QUEUE_CAPACITY = 100; private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, int bufSize, OutputMode mode) { TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( func, AsyncFunction.class, 0, 1, new int[]{1, 0}, in.getType(), Utils.getCallLocationName(), true); // create transform AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>( in.getExecutionEnvironment().clean(func), timeout, bufSize, mode); return in.transform("async wait operator", outTypeInfo, operator); } public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED); } public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) { return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED); } public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED); } public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) { return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED); } } AsyncDataStream提供了unorderedWait、orderedWait两类方法来将AsyncFunction作用于DataStream unorderedWait、orderedWait方法有带capacity参数的也有不带capacity参数的,不带capacity参数即默认使用DEFAULT_QUEUE_CAPACITY,即100;这些方法最后都是调用addOperator私有方法来实现,它使用的是AsyncWaitOperator;unorderedWait、orderedWait方法都带了timeout参数,用于指定等待async操作完成的超时时间 AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载 小结 flink给外部数据访问提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定义一个实现AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装 AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载