首页 > 其他 > 详细

kafka 导入数据到 doris

时间:2021-05-20 22:06:51      阅读:196      评论:0      收藏:0      [点我收藏+]

 

具体见官网:http://doris.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Manipulation/ROUTINE%20LOAD.html#description

 

 

下面是个人测试例子:

1.创建DB

CREATE TABLE IF NOT EXISTS user
(
    siteid INT DEFAULT 10,
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT ‘‘,
    pv BIGINT SUM DEFAULT 0
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

 

 

2.准备数据 (kafka  topic  --doris)

pls input topic:doris
pls input msg:6|12|pp|123
send over !!!
pls input msg:7|32|ww|231
send over !!!
pls input msg:8|12|ee|213
send over !!!
pls input msg:

 

3.导入数据到doris         sea.user:任务标示(唯一)     columus:列名

 CREATE ROUTINE LOAD sea.user ON 
 user COLUMNS TERMINATED BY "|",
 COLUMNS(siteid,citycode,username,pv)
 PROPERTIES(
 "desired_concurrent_number"="1",
 "max_batch_interval"="20",
 "max_batch_rows"="300000",
 "max_batch_size"="209715200")
 FROM KAFKA(
 "kafka_broker_list"="192.168.18.129:9092",
 "kafka_topic"="doris",
 "property.group.id"="gid",
 "property.clinet.id"="cid",
 "property.kafka_default_offsets"="OFFSET_BEGINNING");
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅

3.1 查看routine load状态

SHOW ALL ROUTINE LOAD FOR  sea.user;

 

Ⅴ).查看routine load状态
SHOW ALL ROUTINE LOAD FOR datasource_name.kafka_load;
Ⅵ).常用routine load命令
a).暂停routine load
PAUSE ROUTINE LOAD FOR datasource_name.kafka_load;
b).恢复routine load
RESUME ROUTINE LOAD FOR datasource_name.kafka_load;
c).停止routine load
STOP ROUTINE LOAD FOR datasource_name.kafka_load;
d).查看所有routine load
SHOW [ALL] ROUTINE LOAD FOR datasource_name.kafka_load;
e).查看routine load任务
SHOW ROUTINE LOAD TASK datasource_name.kafka_load;
Ⅶ).查看数据
SELECT * FROM datasource_name.table_name LIMIT 10;

 

 

4.查看数据

技术分享图片

 

 

 

 

 

官网:

example:

4. 简单模式导入json
    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "0,0,0"
    );
    支持两种json数据格式:
  1){"category":"a9jadhx","author":"test","price":895}
  2)[
            {"category":"a9jadhx","author":"test","price":895},
            {"category":"axdfa1","author":"EvelynWaugh","price":1299}
     ]

说明: 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。

   2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

 

6. 用户指定根节点json_root
    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, %Y%m%d))
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json",
        "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array" = "true",
        "json_root" = "$.RECORDS"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
    );

json数据格式:

{
  "RECORDS": [
    {
      "category": "11",
      "title": "SayingsoftheCentury",
      "price": 895,
      "timestamp": 1589191587
    },
    {
      "category": "22",
      "author": "2avc",
      "price": 895,
      "timestamp": 1589191487
    },
    {
      "category": "33",
      "author": "3avc",
      "title": "SayingsoftheCentury",
      "timestamp": 1589191387
    }
  ]
}

 

kafka 导入数据到 doris

原文:https://www.cnblogs.com/lshan/p/14790587.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!