Metadata-Version: 2.1
Name: datafushion-plugins-sparkpy
Version: 1.0.0
Summary: DataFushion的SparkPy算法插件
Home-page: UNKNOWN
Author: 肖林朋
Author-email: 1553990434@qq.com
License: XiaoLinpeng Licence
Platform: any
Description-Content-Type: text/markdown
Requires-Dist: pyspark
Requires-Dist: datafushion-plugins-python

# DataFushion_Plugins_SparkPy说明

## 1.简介

针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出

## 2.使用

- [x] Step1:引入datafushion_spark包中的operation模块
- [x] Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑

```python
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
    FileExtractFormatEnum


if __name__ == '__main__':
    print("start")
    with operation(app_name="test", master="local") as destruction:  # type: HandleDataFrameSet
        input_data_struct_list = destruction.input_data_struct_list
        param_map = destruction.param_map
        spark = destruction.spark  # type:SparkSession

        data_result = None  # type DataFrame

        # 算法逻辑部分
        for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
            # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
            data_list = input_data_struct.data_list  # type: DataFrame
            if index == 0:
                data_result = data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg")
            else:
                data_result = data_result.union(data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg"))

        # 保存最终结果
        destruction.data_result = data_result
        # 保存存储的格式,需要与打包的配置文件对应
        destruction.output_type = FileExtractFormatEnum.JSON.value
```

注意:

------

destruction为解构的`HandleDataFrameSet`实体类

------

input_data_struct_list中包含了输入数据的封装,其类型为List

其元素为`HandleInputDataStruct`类,包含的属性为file_type,file_path,file_input_mapping,data_list

算法需要使用的是file_input_mapping和data_list

data_list是输入数据的`DataFrame`

file_input_mapping为输入数据字段的映射

------

param_map为算法的参数字典

------

在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果

------

在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式`FileExtractFormatEnum.JSON.value`中提供了`JSON,CSV,PARQUET,GENERAL`四类格式

------

目前`PARQUET`类的输出格式只支持作为Spark类型的算法积木中的输入

