加载中......
输入验证码,即可复制
微信扫码下载好向圈APP, 登陆后即可进入消息页面查看验证码
只需要3秒时间
本文将对Flink Transformation中map、filter和flatMap算子进行介绍,并结合例子展示具体使用方法。
一、map算子

map算子可以用来做一些清洗工作,根据具体的业务,map算子使用自定义的map函数进行处理,具体的转换是:输入一个DataStream元素对应输出一个DataStream元素。
示例如下:
功能描述:从外部文件读取,温度传感器样例数据,转换成温度传感器的实体类
温度传感器实体类代码:
public static class SensorReading{        // 传感器 id        public String sensorId;        // 时间戳        public String timeStamp;        // 温度        public Double temperature;        // 状态描述        public  String lowOrhigt;        // 状态标识        public String status;        public SensorReading() {}        public  SensorReading(String sensorId,String timeStamp,Double temperature){            this.sensorId = sensorId;            this.timeStamp = timeStamp;            this.temperature = temperature;        }        public  SensorReading(String sensorId,Double temperature,String status){            this.sensorId = sensorId;            this.temperature = temperature;            this.status=status;        }        public  SensorReading(String sensorId,Double temperature){            this.sensorId = sensorId;            this.temperature = temperature;        }        @Override        public String toString() {            return "SensorReading{" +                    "sensorId='" + sensorId + '\'' +                    ", timeStamp=" + timeStamp +                    ", temperature=" + temperature +                    ", lowOrhigt=" + lowOrhigt +                    ", status=" + status +                    '}';        }    }
转换代码:
// 初始化执行环境对象        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 从文件中读取数据        String inputPath  = "F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt";        // 获取数据        DataStreamSource<String> dataStream  = env.readTextFile(inputPath);        // 1、先转换成SensorReading类型(简单转换操作)        DataStream<SensorReading> stream1 =  dataStream.map(new MapFunction<String, SensorReading>() {            @Override            public SensorReading map(String data) throws Exception {                String[] arr = data.split(",");                //System.out.println(arr[0]+"->"+arr[1]+"->"+arr[2]);                return new SensorReading(arr[0], arr[1], Double.valueOf(arr[2].toString()));            }        });二、filter算子

filter算子是过滤筛选,将所有符合判断条件的结果集输出。算子对每个元素进行过滤,使用filter函数进行逻辑判断,对于输入的每个元素,如果符合判断条件则返回True,保留这元素,不符合则返回False,忽略这元素。
示例如下:
功能描述:过滤DataStream数据流中的温度传感器每个元素,筛选出温度大于30度的元素。 元素对应的数据类型为SensorReading(温度传感器)。
过滤代码如下:
SingleOutputStreamOperator<SensorReading> resultFilter  =      stream1.filter(new FilterFunction<SensorReading>() {          @Override         public boolean filter(SensorReading sensorReading) throws Exception {             if( sensorReading.temperature > 30.0 ){                 return true;             }              return false;            }        });三、flatMap算子

flatMap算子是打平操作和map有些相似,输入都是数据流中的每个元素,与之不同的是,flatMap可以输出是零个、一个或多个元素。
flatMap可以具有map和filter。但是map和filter的语义更明确,有助于提高代码的可读性。其中map表示一对一的转换,一个输入,对应一个输出。其中filter表示对输入数据进行过滤操作。所以Flink同时提供这三个API根据具体业务场景,灵活开发。
示例如下:
//针对流中的每一个元素生成,0个,1个或多个元素,可以实现map filterSingleOutputStreamOperator<SensorReading> resultFlatMap =stream1.flatMap(new FlatMapFunction<SensorReading, SensorReading>() {@Override    public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception {        // 这里可以进行类似filter过滤        if( sensorReading.temperature > 30.0 ){            // 这里可以进行类似map具体转换工作,可以输出0到多个元素            out.collect(sensorReading);            out.collect(sensorReading);        }    }});// flatMap实现map功能 对每个温度传感器度数,加0.1度SingleOutputStreamOperator<SensorReading> resultFlatMap1 =resultFlatMap.flatMap(new FlatMapFunction<SensorReading, SensorReading>() {@Override    public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception {        sensorReading.temperature +=0.1;        out.collect(sensorReading);    }});//flatMap实现filter功能,筛选出温度大于30度的元素SingleOutputStreamOperator<SensorReading> resultFlatMap2 =resultFlatMap1.flatMap(new FlatMapFunction<SensorReading, SensorReading>() {    @Override    public void flatMap(SensorReading sensorReading, Collector<SensorReading> out) throws Exception {        if( sensorReading.temperature > 30.0 ){            out.collect(sensorReading);        }    }});
map、filter、flatMap示意图如下:

Flink常用算子map、filter和flatMap示例-1.jpg
JAVA圈
23353 查看 0 0 反对

说说我的看法高级模式

您需要登录后才可以回帖 登录|立即注册

还没人评论此主题哦