(sqoop/Flume/NiFi/Kafka)
收据采集->>数据录入->>数据清洗->>数据处理->>数据集成->>数据监管->>数据分析->>数据服务
数据采集 确定数据需求 确定需要采集的数据字段 制定采集方法 验证采集数据的有效性 数据采集的关注点 数据有效性
数据来源 | 提取方法 | 目标 |
---|---|---|
业务数据 (RDB) | 文件导出 | 数据集成 |
Sqoop数据导入 | ||
网站日志 | Flume / NiFi / Kafka (重点) | |
伙伴数据 | 数据集成/ 服务 | |
社交网络 / 公开数据 | 数据爬取 | 数据集成 |
消息公告板Email / 会议数据 | 特殊的数据提取方式 | 数据集成 |
物联网设备数据 | NiFi / 特殊的数据提取方式 | 数据集成 |
其他 | 特殊的数据提取方式 |
数据质量是数据采集阶段最重要的
常见数据质量问题
数据质量判断原则 准确、完整、完备、有效、一致、格式统一、不重复
数据校验验证数据集中的数据是否有效
数据校验的前提
校验数据的方法
数据校验会多次实施
数据校验工具-voluptuous
python>> pip install voluptuous
用Schema校验数据有效性
使用fillna的多种方式填充NaN值
使用interpolate()插值器填充NaN值,根据日期或时间按值等差填充
使用dropna删除包含缺失值的记录
异常值 合法但远离大部分数据的值
判断异常值 通过标准差计算确定异常值范围 标准差取值范围T,绝对值大于T的值 通过数据频率分布计算异常值范围 超出90%数值分布的值
修正异常值的影响 Winsorizing(温莎法)
SPARK_HOME 和 SPARK_CONF_DIR
ipython
from notebook.auth import passwd
passwd()
#键入密码
#获取sha1值,复制
#rw
#sha1:0cc7d44db1b9:1ce93f146c1e0faaebf73740ca9db8ba90c7adde
cd~
jupyter notebook --generate-config
vi ./.jupyter/jupyter_notebook_config.py
#添加输入以下内容
c.NotebookApp.allow_root = True
c.NotebookApp.ip = ‘*‘
c.NotebookApp.open_browser = False
c.NotebookApp.password = ‘sha1:粘贴上一步复制的值‘
c.NotebookApp.port = 7070
cd~
vi /etc/profile
#添加以下内容
export PYSPARK_PYTHON=$ANACONDA_HOME/bin/python3
export PYSPARK_DRIVER_PYTHON=$ANACONDA_HOME/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
ipython_opts="notebook -pylab inline"
source /etc/profile
cd~
vi .jupyter/jupyter_notebook_config.py
#添加以下内容
c.NotebookApp.notebook_dir=‘自己定义的工作目录‘
在pyspark中可以直接使用spark,语法与scala类似,主要有以下不同之处
1、匿名函数写法:scala是直接写,py是lambda表达式
2、可迭代对象(列表、列)的取值符号,scala是()或[],py可能是反的
使用pyspark解析复杂字段
from pyspark.sql.functions import *
from pyspark.sql.types import *
df = spark.read.option("header", "true").csv("file:///root/example/movies_metadata.csv")
# Define the schema for the movie category data field
genres = ArrayType(StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]))
# Organize the movie category with the original move id
df_MovieCategory = df.withColumn("movie_category", from_json(col("genres"), genres)) .select(col("id"), col("movie_category")).select(col("id"), explode(col("movie_category"))) .select(col("id"), col("col.name"))
原文:https://www.cnblogs.com/whoyoung/p/11424212.html