Great Expectations(GE)数据健康性检测安装使用
目录
软件介绍
环境信息
安装GE
使用GE
常规方式
内存模式
引入依赖
创建GE context
配置context datasource
生成data frame
创建batch request
创建suite
创建validator
设定验证规则
执行验证并打印结果
结果示例
DEMO
软件介绍
Great Expectations用来检测数据健康性。
docs地址https://greatexpectations.io/
他支持数据库,文件系统,云,内存的数据健康性检测。
支持数据库如下Athena,BigQuery,MSSQL,MYSQL,PostgreSQL,Redshift,Snoflake,SQLLite
支持通过Spark驱动或者Pandas驱动来访问文件系统
支持通过Sprak或Pandas来连接以下云存储AWS-S3,GCS(google),Azure Blob storage(Microsoft)
支持通过Spark和Pandas来进行内存操作。
GE来进行数据健康性检测,可以检测例如以下内容
1、字段为null验证
2、字段的取值范围验证
3、数据行数阈值验证
......
(共支持265种检测方式,详见https://greatexpectations.io/expectations/)
本文只介绍如果通过使用GE与spark结合来进行内存数据健康性检查。
从安装开始。
环境信息
系统版本macOS或Red Hat 7.3.1-13
软件信息python3.7(建议3.6.1版本以上,否则可能无法执行)
依赖pip/pip3、ruamel_yaml、scipy、matplotlib、python-ldap、typing、git、spark
安装GE
GE安装非常简单
下载GE源码并安装
git clone https://github./superconductive/ge_tutorials cd ge_tutorials pip install great_expectations
测试是否安装成功,如果没有返回请使用pip3重新安装全部依赖
great_expectations --version
如果安装成功会得到类似如下的返回
使用GE
常规方式
GE基本工作方式如下
- 新建datasource连接,如果是GE支持的数据库将非常简单
- 新建suite,配置验证规则,依赖datasource自动生成基本验证规则非常简单
- 新建checkpoint,ge执行验证文件,需要配置datasource和suite
以上3步做好后,通过great expectations checkpoint run
内存模式
引入依赖
打开一个空的python文件并引入依赖
from ruamel import yaml from pyspark.sql import SparkSession from typing import Any, Dict, List, Optional import great_expectations as ge from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest from great_expectations.data_context import BaseDataContext from great_expectations.data_context.types.base import ( DataContextConfig, InMemoryStoreBackendDefaults, ) from great_expectations.core.expectation_configuration import ExpectationConfiguration
创建GE context
store_backend_defaults = InMemoryStoreBackendDefaults()
data_context_config = DataContextConfig(
store_backend_defaults=store_backend_defaults,
checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
)
context = BaseDataContext(project_config=data_context_config)
配置context datasource
datasource_yaml = f"""
name: my_spark_dataframe
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- batch_id
"""
context.test_yaml_config(datasource_yaml)
context.add_datasource(yaml.load(datasource_yaml))
生成data frame
df = SparkSession.builder.appName('leo_test_load')
.enableHiveSupport()
.getOrCreate()
.sql("select from beta.click here dt = '20220518'")
创建batch request
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_dataframe",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="asset",
batch_identifiers={"batch_id": "default_identifier"},
runtime_parameters={"batch_data": df},
)
创建suite
suite = context.create_expectation_suite(
expectation_suite_name="test_suite", overrite_existing=True
)
创建validator
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
设定验证规则
datasource_yaml = f""" name: my_spark_dataframe class_name: Datasource execution_engine: class_name: SparkDFExecutionEngine force_reuse_spark_context: true data_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - batch_id """ context.test_yaml_config(datasource_yaml) context.add_datasource(yaml.load(datasource_yaml))
生成data frame
df = SparkSession.builder.appName('leo_test_load')
.enableHiveSupport()
.getOrCreate()
.sql("select from beta.click here dt = '20220518'")
创建batch request
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_dataframe",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="asset",
batch_identifiers={"batch_id": "default_identifier"},
runtime_parameters={"batch_data": df},
)
创建suite
suite = context.create_expectation_suite(
expectation_suite_name="test_suite", overrite_existing=True
)
创建validator
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
设定验证规则
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_dataframe",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="asset",
batch_identifiers={"batch_id": "default_identifier"},
runtime_parameters={"batch_data": df},
)
创建suite
suite = context.create_expectation_suite(
expectation_suite_name="test_suite", overrite_existing=True
)
创建validator
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
设定验证规则
validator = context.get_validator( batch_request=batch_request, expectation_suite_name="test_suite" )
设定验证规则
可以设置多个规则
kargs: Dict[str, Any] = {"column": "score"}
kargs["min_value"] = 0
kargs["max_value"] = 100
expectation = ExpectationConfiguration("expect_column_values_to_be_beteen", kargs)
validator.append_expectation(expectation)
执行验证并打印结果
print(validator.validate().results)
结果示例
[{
"result": {
"element_count": 3199059,
"unexpected_count": 322274,
"unexpected_percent": 10.212081775492893,
"partial_unexpected_list": [
106,
160,
107,
122,
344,
476,
378,
203,
106,
183,
101,
117,
106,
121,
110,
101,
109,
147,
242,
155
],
"missing_count": 43248,
"missing_percent": 1.3518975423710535,
"unexpected_percent_total": 10.074024892945081,
"unexpected_percent_nonmissing": 10.212081775492893
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"suess": false,
"expectation_config": {
"kargs": {
"column": "itemindex",
"min_value": 0,
"max_value": 100,
"batch_id": "d628c5b9f32bdc923d4d1805503a477d"
},
"expectation_type": "expect_column_values_to_be_beteen",
"meta": {}
}
}]
DEMO
[{
"result": {
"element_count": 3199059,
"unexpected_count": 322274,
"unexpected_percent": 10.212081775492893,
"partial_unexpected_list": [
106,
160,
107,
122,
344,
476,
378,
203,
106,
183,
101,
117,
106,
121,
110,
101,
109,
147,
242,
155
],
"missing_count": 43248,
"missing_percent": 1.3518975423710535,
"unexpected_percent_total": 10.074024892945081,
"unexpected_percent_nonmissing": 10.212081775492893
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"suess": false,
"expectation_config": {
"kargs": {
"column": "itemindex",
"min_value": 0,
"max_value": 100,
"batch_id": "d628c5b9f32bdc923d4d1805503a477d"
},
"expectation_type": "expect_column_values_to_be_beteen",
"meta": {}
}
}]
DEMO
本地执行demo
from ruamel import yaml
from pyspark.sql import SparkSession
from typing import Any, Dict, List, Optional
import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
)
#from great_expectations.expectations.core.expectation_con import ExpectationConfiguration
from great_expectations.core.expectation_configuration import ExpectationConfiguration
# basic dataframe
data = [
{"a": 1, "b": 2, "c": 3},
{"a": 4, "b": 5, "c": 6},
{"a": 7, "b": 8, "c": 9},
]
sparksession = SparkSession.builder.master("local[1]")
.appName('ge_demo')
.getOrCreate()
df = sparksession.createDataFrame(data=data)
#df = sparksession.read.parquet("file:///Users/cl10189-m/greate_expectations/part-00000-2523d045-ab36-43ef-9b62-0101d0530cd9-c000.snappy.parquet")
# NOTE: InMemoryStoreBackendDefaults SHOULD NOT BE USED in normal settings. You
# may experience data loss as it persists nothing. It is used here for testing.
# Please refer to docs to learn ho to instantiate your DataContext.
store_backend_defaults = InMemoryStoreBackendDefaults()
data_context_config = DataContextConfig(
store_backend_defaults=store_backend_defaults,
checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
)
context = BaseDataContext(project_config=data_context_config)
datasource_yaml = f"""
name: my_spark_dataframe
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- batch_id
"""
context.test_yaml_config(datasource_yaml)
context.add_datasource(yaml.load(datasource_yaml))
# Here is a RuntimeBatchRequest using a dataframe
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_dataframe",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="ge_asset", # This can be anything that identifies this data_asset for you
batch_identifiers={"batch_id": "default_identifier"},
runtime_parameters={"batch_data": df}, # Your dataframe goes here
)
suite = context.create_expectation_suite(
expectation_suite_name="test_suite", overrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
kargs: Dict[str, Any] = {"column": "a"}
kargs["min_value"] = 1
kargs["max_value"] = 6
expectation = ExpectationConfiguration("expect_column_values_to_be_beteen", kargs)
validator.append_expectation(expectation)
kargs: Dict[str, Any] = {"column": "b"}
expectation = ExpectationConfiguration("expect_column_values_to_be_unique", kargs)
validator.append_expectation(expectation)
print(validator.validate().results)
sparksession.s()
空调维修
- 我的世界电脑版运行身份怎么弄出来(我的世界
- 空调抽湿是什么意思,设置抽湿的温度有什么意
- 方太燃气灶有一个打不着火 怎么修复与排查方法
- 夏季免费清洗汽车空调的宣传口号
- 清洗完空调后出现漏水现象
- iphone6能玩什么游戏(iphone6游戏)
- 如何设置电脑密码锁屏(如何设置电脑密码锁屏
- win10删除开机密码提示不符合密码策略要求
- 电脑w7显示不是正版(w7不是正版怎么解决)
- 万家乐z8热水器显示e7解决 怎么修复与排查方法
- 1匹空调多少瓦数(1匹空调多少瓦)
- 安卓手机连接电脑用什么软件好(关于安卓手机
- 电脑网页看视频卡是什么原因(爱拍看视频卡)
- 华帝燃气灶点火器一直响然后熄火怎么办:问题
- 电脑壁纸怎么换(关于电脑壁纸怎么换的介绍)
- 冬天空调的出风口应该朝什么方向(冬天空调风