本文将对Flink Transformation中map、filter和flatMap算子进行介绍,并结合例子展示具体使用方法。
一、map算子
map算子可以用来做一些清洗工作,根据具体的业务,map算子使用自定义的map函数进行处理,具体的转换是:输入一个DataStream元素对应输出一个DataStream元素。
示例如下:
功能描述:从外部文件读取,温度传感器样例数据,转换成温度传感器的实体类
温度传感器实体类代码:
public static class SensorReading{ // 传感器 id public String sensorId; // 时间戳 public String timeStamp; // 温度 public Double temperature; // 状态描述 public String lowOrhigt; // 状态标识 public String status; public SensorReading() {} public SensorReading(String sensorId,String timeStamp,Double temperature){ this.sensorId = sensorId; this.timeStamp = timeStamp; this.temperature = temperature; } public SensorReading(String sensorId,Double temperature,String status){ this.sensorId = sensorId; this.temperature = temperature; this.status=status; } public SensorReading(String sensorId,Double temperature){ this.sensorId = sensorId; this.temperature = temperature; } @Override public String toString() { return "SensorReading{" + "sensorId='" + sensorId + '\'' + ", timeStamp=" + timeStamp + ", temperature=" + temperature + ", lowOrhigt=" + lowOrhigt + ", status=" + status + '}'; } }
转换代码:
// 初始化执行环境对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 String inputPath = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"; // 获取数据 DataStreamSource<String> dataStream = env.readTextFile(inputPath); // 1、先转换成SensorReading类型(简单转换操作) DataStream<SensorReading> stream1 = dataStream.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String data) throws Exception { String[] arr = data.split(","); //System.out.println(arr[0]+"->"+arr[1]+"->"+arr[2]); return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString())); } });
二、filter算子
filter算子是过滤筛选,将所有符合判断条件的结果集输出。算子对每个元素进行过滤,使用filter函数进行逻辑判断,对于输入的每个元素,如果符合判断条件则返回True,保留这元素,不符合则返回False,忽略这元素。
示例如下:
功能描述:过滤DataStream数据流中的温度传感器每个元素,筛选出温度大于30度的元素。 元素对应的数据类型为SensorReading(温度传感器)。
过滤代码如下:
SingleOutputStreamOperator<SensorReading> resultFilter = stream1.filter(new FilterFunction<SensorReading>() { @Override public boolean filter(SensorReading sensorReading) throws Exception { if( sensorReading.temperature > 30.0 ){ return true; } return false; } });
三、flatMap算子
flatMap算子是打平操作和map有些相似,输入都是数据流中的每个元素,与之不同的是,flatMap可以输出是零个、一个或多个元素。
flatMap可以具有map和filter。但是map和filter的语义更明确,有助于提高代码的可读性。其中map表示一对一的转换,一个输入,对应一个输出。其中filter表示对输入数据进行过滤操作。所以Flink同时提供这三个API根据具体业务场景,灵活开发。
示例如下:
//针对流中的每一个元素生成,0个,1个或多个元素,可以实现map filterSingleOutputStreamOperator<SensorReading> resultFlatMap =stream1.flatMap(new FlatMapFunction<SensorReading, SensorReading>() {@Override public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception { // 这里可以进行类似filter过滤 if( sensorReading.temperature > 30.0 ){ // 这里可以进行类似map具体转换工作,可以输出0到多个元素 out.collect(sensorReading); out.collect(sensorReading); } }});// flatMap实现map功能 对每个温度传感器度数,加0.1度SingleOutputStreamOperator<SensorReading> resultFlatMap1 =resultFlatMap.flatMap(new FlatMapFunction<SensorReading, SensorReading>() {@Override public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception { sensorReading.temperature +=0.1; out.collect(sensorReading); }});//flatMap实现filter功能,筛选出温度大于30度的元素SingleOutputStreamOperator<SensorReading> resultFlatMap2 =resultFlatMap1.flatMap(new FlatMapFunction<SensorReading, SensorReading>() { @Override public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception { if( sensorReading.temperature > 30.0 ){ out.collect(sensorReading); } }});
map、filter、flatMap示意图如下: