自学内容网 自学内容网

【Flink实战】flink消费http数据并将数组展开多行

一. 需求描述

flink消费http接口的数据,将json中的数组展开多行

如下样例数据以及要求处理的数据效果

{  
  "name": "John Doe",  
  "age": 30,  
  "address": {  
    "street": {  
      "street": "123 Main St",  
      "city": "New York",  
      "state": "NY"  
    },  
    "city": "New York",  
    "state": "NY"  
  },  
  "phoneNumbers": [  
    {  
      "type": "home",  
      "number": "212-555-1234"  
    },  
    {  
      "type": "fax",  
      "number": "646-555-4567"  
    }  
  ],  
  "children": [],  
  "spouse": null  
}

nameagestreetcitystatephone_typephone_number
John Doe30123 Main StNew YorkNYhome212-555-1234
John Doe30123 Main StNew YorkNYfax646-555-4567

二. 方案思路

1. 解决思路

  1. flink 消费http接口的数据(json),发送到下游
  2. 下游算子解析json数据,当遇到数组时,算子解析返回array
  3. 通过使用CROSS JOIN 将数组数据拍平,如上表格展现

2. flink json 解析

2.1. 通过json path解析非array数据

如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。

在这里插入图片描述

这里使用 cast转换,如下举例

cast(JSON_VALUE(json_string,'$.id') as int) as id ,  
JSON_VALUE(json_string,'$.name')  as name,  
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,  
JSON_VALUE(json_string,'$.details.address') as address,

 

2.2. 通过json path解析array数据

官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。

在这里插入图片描述

如下:

<dependencies>  
    <dependency>  
        <groupId>com.jayway.jsonpath</groupId>  
        <artifactId>json-path</artifactId>  
        <version>2.6.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-common</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>

通过udf解决

package com.dtstack.chunjun.local.test;  
  
import com.jayway.jsonpath.JsonPath;  
import org.apache.flink.table.functions.ScalarFunction;  
  
import java.util.ArrayList;  
import java.util.List;  
  
public class JsonArrayFieldExtractor extends ScalarFunction {  
  
    public List<String> eval(String jsonString, String jsonPath) {  
        if (jsonString == null || jsonString.isEmpty()) {  
            return new ArrayList<String>();  
        }  
        try {  
            List<?> result = JsonPath.read(jsonString, jsonPath);   
            List<String> stringList = new ArrayList<>();  
            for (Object obj : result) {  
                stringList.add(obj.toString());  
            }  
            return stringList;  
        } catch (Exception e) {  
            return new ArrayList<String>();  
        }  
    }  
  
}

3. CROSS JOIN逻辑

Array Expansion

在这里插入图片描述

注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。

 

三. 方案实现

1. http json数据样例

{  
  "id": 1,  
  "name": "Alice",  
  "details": {  
    "age": {"real":11},  
    "address": "123Mainst",  
    "contacts": [  
      {  
        "type": "email",  
        "value": "alice@example.com"  
      },  
      {  
        "type": "phone",  
        "value": "123-456-7890"  
      }  
    ],  
    "grade": [  
      {  
        "grade": [{"zz":11},{"zz":11}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      },  
      {  
        "grade": [{"zz":22}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      }  
    ]  
  }  
}

 

2. flink sql 说明

CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';

CREATE TABLE source
(
       json_string varchar
) WITH (
      'connector' = 'http-x'
      ,'url' = 'http://localhost:8088/api/arraypage'
      ,'intervalTime'= '3000'
      ,'method'='get'                              --请求方式:get 、post
      ,'decode'='text'                             -- 数据格式:只支持json模式
                                                   -- 以下4个参数要同时存在:
      ,'page-param-name'='pagenum'                          -- 多次请求参数1:分页参数名:例如:pageNum
      ,'start-index'='1'                             -- 多次请求参数2:开始的位置
      ,'end-index'='4'                               -- 多次请求参数3:结束的位置
      ,'step'='1'                                  -- 多次请求参数4:步长:默认值为1
      );

CREATE TABLE sink
(
    id               int,
    name             varchar,
    `real`               int,
    address                varchar,
    zz                int,
    yy                varchar
) WITH (
      'connector' = 'print'
      );


insert into sink   SELECT
                      cast(JSON_VALUE(json_string,'$.id') as int) as id ,
                       JSON_VALUE(json_string,'$.name')  as name,
                       cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,
                        JSON_VALUE(json_string,'$.details.address') as address,
                        cast(`$.grade[*].grade[*].zz` as int ) as zz,
                        `$.details.grade[*].bb.rr.yy` as yy
                      FROM source
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy'   )) AS T1(`$.details.grade[*].bb.rr.yy`);





--{
--  "id": 1,
--  "name": "Alice",
--  "details": {
--    "age": {"real":11},
--    "address": "123Mainst",
--    "contacts": [
--      {
--        "type": "email",
--        "value": "alice@example.com"
--      },
--      {
--        "type": "phone",
--        "value": "123-456-7890"
--      }
--    ],
--    "grade": [
--      {
--        "grade": [{"zz":11},{"zz":11}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      },
--      {
--        "grade": [{"zz":22}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      }
--    ]
--  }
--}

消费结果
在这里插入图片描述

具体逻辑描述

  1. http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
    [feature-DTStack#1775][connector][http] http supports offline mode

  2. 使用JSON_VALUE、get_json_array解析为string和array<string>,之后使用cast进行类型转换

  3. CROSS JOIN 生成笛卡尔积


原文地址:https://blog.csdn.net/hiliang521/article/details/142412086

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!