首页 > 其他 > 详细

Azure Synapse Analysis 开箱 Blog - 叁 -- Function ETL

时间:2020-07-13 15:57:38      阅读:52      评论:0      收藏:0      [点我收藏+]

        继续上一篇的内容,本文将为大家介绍将 ChangeFeed 抽取逻辑通过 Azure Function 服务实现,架构图如下,通过 Function 服务来执行 ChangeFeed 的读取,并将其转存至 DataLake 存储中。

技术分享图片

 

        在前文最后我们介绍了通过 Function 服务可以简化整个逻辑的代码开发,Azure Function 服务中原生已经内置了很多与 Azure 其他服务原生集成的连接器,可以帮助客户实现与上下游服务的对接,用户无需关注连接器的实现,通过框架的调用,用户可以直接可以通过对象访问到上下游服务中的数据,用户只需要关注业务逻辑即可。Cosmos Database 也在 Azure Function 的支持之中,并且内置的连接器采用的也是 ChangeFeed 来实现的,用户可以直接开箱即用实现 ChangeFeed 数据的读取,无需自己维护抽取逻辑代码。Azure Function 原生支持的连接器如下:

 

Type1.x2.x and higher1TriggerInputOutput
Blob storage ? ? ? ? ?
Cosmos DB ? ? ? ? ?
Event Grid ? ? ?   ?
Event Hubs ? ? ?   ?
HTTP & webhooks ? ? ?   ?
IoT Hub ? ? ?   ?
Microsoft Graph
Excel tables
  ?   ? ?
Microsoft Graph
OneDrive files
  ?   ? ?
Microsoft Graph
Outlook email
  ?     ?
Microsoft Graph
events
  ? ? ? ?
Microsoft Graph
Auth tokens
  ?   ?  
Mobile Apps ?     ? ?
Notification Hubs ?       ?
Queue storage ? ? ?   ?
SendGrid ? ?     ?
Service Bus ? ? ?   ?
SignalR   ?   ? ?
Table storage ? ?   ? ?
Timer ? ? ?    
Twilio ? ?     ?

         下面我们来开始操作:

1. 准备开发环境,建议使用 Visual Studio Code,其内置的 Azure Function 的开发扩展,可以方便开发

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-01

2. 创建 Azure Function 项目,注意在选择 trigger 部分,选择 cosmos db

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-02

3. 准备 function.json 配置文件,function.json 主要描述 function 与上下游数据连接的参数,其中下述配置中 type:cosmosDBTrigger 部分定义了 cosmos 的连接信息, databaseName,collectionName 替换为前面所创建的 cosmos db的名称,leaseCollectionName 是 function 用来维护租约和 CheckPoint。connectionStringSetting 参见后续 local.settings.json。type:blob 部分定义了 function 下游存储 Data Lake 的连接信息,其中 path 参数定义了 function 抽取增量变化数据在 Data Lake 中的存储路径,connection 参数参见后续 local.settings.json。另外 feedPollDelay 参数表示 function 服务轮询 ChangeFeed 数据的间隔,其单位为毫秒,在演示中建议大家可以设置为 60000,实际根据数据水线处理时间和数据更新需求来决定。

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "cosmosDBTrigger",
      "name": "documents",
      "direction": "in",
      "leaseCollectionName": "<leasecollectionname>",
      "connectionStringSetting": "Cosmos_DOCUMENTDB",
      "databaseName": "<databasename>",
      "collectionName": "<collectionname>",
      "createLeaseCollectionIfNotExists": "true",
      "feedPollDelay": <pollinterval>
    },
    {
      "name": "outputblob",
      "type": "blob",
      "path": "<filesystemname>/{DateTime}.csv",
      "connection": "ChangeFeedResultStorage",
      "direction": "out"
    }
  ]
}

4. 准备 local.settings.json,该配置文件中定义了在上述 function.json 中所引用的连接密钥参数, 其中 Cosmos_DOCUMENTDB 和 ChangeFeedResultStorage 内分别填入,Cosmos 和 Data Lake 的连接字符串,可以在 portal 中对应资源的 access 信息部分获取。

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "<connectionstring>",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "Cosmos_DOCUMENTDB": "<cosmosconnectionstring>",
    "ChangeFeedResultStorage": "<datalakeconnectionstring>"
  }
}

5. 准备业务逻辑代码 init.py,init.py 是 function 函数被拉起后的 entrypoint 函数,我们将前面介绍的抽取 changefeed 增量变化数据的代码逻辑定义其中,下述演示代码中通过调用 documents 和 outputblob 借助 function 内置的连接器实现对上下游数据访问,无需再自己开发集成代码。用户只需要开发自己的数据处理逻辑即可,演示中是将增量变化数据转存到 Data Lake 存储中。

import logging
import json 
import csv
import io

import azure.functions as func

# read changefeed from cosmos, store as CSV file
def main(documents: func.DocumentList, outputblob: func.Out[str]) :    
    if documents:
        count = 0
        outputresult = io.StringIO()
        csv_writer = csv.writer(outputresult, quoting=csv.QUOTE_NONNUMERIC)
        for document in documents:
            if count == 0:
               header = json.loads(document.to_json()).keys()
               csv_writer.writerow(header)
               count += 1
            csv_writer.writerow(json.loads(document.to_json()).values())    
        outputblob.set(outputresult.getvalue())

 6. 通过 VS Code Function 本地调试工具进行测试,可以仿真通过前面 blog 中的数据插入函数在 cosmos db 内插入一些新的数据,然后在 Data Lake 中是否转存成功。

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-04?tabs=powershell

7. 将 Function 服务打包发布至 Azure Function 服务中,上述开发测试均在本地完成,测试无误后将代码正式发布至 Azure Function 服务

参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-05

        至此通过 Function 服务完成 ChangeFeed 读取及转存至 DataLake 已经完成。整个 Function 代码中 function.json 中针对不同连接器的参数说明大家可以参阅:https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-cosmosdb?tabs=csharp

Azure Synapse Analysis 开箱 Blog - 叁 -- Function ETL

原文:https://www.cnblogs.com/wekang/p/13282407.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!