App下載

Python 將自定義分析推送到 Netezza 數(shù)據(jù)庫(kù)

耳機(jī)依賴(lài)患者 2021-09-15 09:58:34 瀏覽數(shù) (2137)
反饋

Albury在本篇文章中,你將會(huì)了解 ibmdpy4nps 包是什么,以及它是如何幫助機(jī)器學(xué)習(xí)工程師和數(shù)據(jù)科學(xué)家在 Netezza 中執(zhí)行他們的自定義 ML 和分析功能。Netezza On-Prem 和 Netezza 云版本都支持此功能,提供跨平臺(tái)的無(wú)縫 One Netezza 體驗(yàn)。

介紹

為什么需要數(shù)據(jù)庫(kù)內(nèi)分析?

對(duì)數(shù)據(jù)庫(kù)內(nèi)分析的需求是眾所周知的。利用 MPP 硬件進(jìn)行更快處理的強(qiáng)大功能以及不將數(shù)據(jù)移出數(shù)據(jù)庫(kù)的便利性使得 ML 工程師和數(shù)據(jù)科學(xué)家在像 Netezza? 這樣的 MPP 數(shù)據(jù)庫(kù)中運(yùn)行分析非常有吸引力。

ibmdbpy4nps 與 Netezza 數(shù)據(jù)庫(kù)內(nèi)分析有何不同

Netezza 數(shù)據(jù)庫(kù)內(nèi)分析 (INZA) 是一個(gè)極其強(qiáng)大且全面的分析包,它提供了多個(gè)足以處理大多數(shù) ML 步驟的 SQL 例程。但是,用戶(hù)僅限于包中可用的內(nèi)容。例如,用戶(hù)可能希望使用他最喜歡的 Python ML 庫(kù)中可用的最新算法,而不是可用的 INZA 算法來(lái)解決 ML 問(wèn)題?;蛘咚赡芟雽?duì)數(shù)據(jù)集應(yīng)用一些自定義轉(zhuǎn)換,這在 INZA 中不可用。這就是 ibmdbpy4nps 包的用武之地。它構(gòu)建在 Netezza 分析可執(zhí)行技術(shù)之上,允許用戶(hù)通過(guò)簡(jiǎn)單的 Python 接口直接在數(shù)據(jù)庫(kù)內(nèi)執(zhí)行自定義 ML 代碼。使用 ibmdpy4nps,具有靈活性(用戶(hù)在數(shù)據(jù)庫(kù)內(nèi)執(zhí)行的內(nèi)容方面不受限制),以及速度(你仍然不在客戶(hù)端運(yùn)行它,而是在數(shù)據(jù)庫(kù)內(nèi)部運(yùn)行它)。用戶(hù)還可以使用 Pandas 數(shù)據(jù)框樣式的抽象連接到數(shù)據(jù)庫(kù)表,并將其用于數(shù)據(jù)探索(已包含多個(gè)標(biāo)準(zhǔn) Pandas 數(shù)據(jù)框操作的 SQL 轉(zhuǎn)換)。

架構(gòu)概述如下所示。

流動(dòng)

三個(gè)架構(gòu)層一起(如下所述)為用戶(hù)提供無(wú)縫體驗(yàn),將客戶(hù)端代碼作為數(shù)據(jù)庫(kù)內(nèi)的分析可執(zhí)行文件推送。用戶(hù)只需通過(guò) ibmdbpy4nps 包中提供的 Python 模塊調(diào)用與分析可執(zhí)行客戶(hù)端層交互。

  • 客戶(hù)端層——提供用戶(hù)可以使用所需參數(shù)導(dǎo)入和調(diào)用的 Python 模塊。
  • 生成層 — 將客戶(hù)端的 Python 調(diào)用轉(zhuǎn)換為基于分析可執(zhí)行文件的代碼,并添加調(diào)用預(yù)先存在的用戶(hù)定義函數(shù)所需的 SQL。
  • 執(zhí)行層 — 提供預(yù)先存在的用戶(hù)定義函數(shù),執(zhí)行 SQL,并運(yùn)行處理執(zhí)行的分析可執(zhí)行啟動(dòng)程序。

這個(gè)包為數(shù)據(jù)庫(kù)內(nèi)分析提供了什么

  1. 允許用戶(hù)使用 Pandas 風(fēng)格的數(shù)據(jù)框抽象連接到數(shù)據(jù)庫(kù)表。
  2. 支持使用內(nèi)置 SQL 轉(zhuǎn)換(再次使用 Pandas 數(shù)據(jù)框樣式)的數(shù)據(jù)庫(kù)內(nèi)數(shù)據(jù)探索。
  3. 支持直接在數(shù)據(jù)庫(kù)中執(zhí)行自定義 ML 代碼(通過(guò)簡(jiǎn)單的 Python 接口)

用例

場(chǎng)景

雖然在數(shù)據(jù)庫(kù)中推送任何自定義代碼在技術(shù)上是可行的,但它可能并非一直都是最佳的。Netezza 以其大規(guī)模并行處理能力而聞名。創(chuàng)建表時(shí),數(shù)據(jù)通常分布在不同的數(shù)據(jù)切片上,并且在這些數(shù)據(jù)切片(在工作節(jié)點(diǎn)上)中的每一個(gè)上并行執(zhí)行 SQL 操作,然后再將它們聚合到主機(jī)上。當(dāng)用例旨在利用 Netezza 的并行性時(shí),這是理想的選擇。一些示例場(chǎng)景:

  1. 假設(shè)你需要對(duì)數(shù)據(jù)庫(kù)表的每條記錄應(yīng)用自定義數(shù)據(jù)轉(zhuǎn)換。這不需要在應(yīng)用例程之前在一個(gè)地方聚合數(shù)據(jù),因此它可以在不同的數(shù)據(jù)切片上并行執(zhí)行。這是一個(gè)最佳方案。
  2. 假設(shè)你需要針對(duì)整個(gè)數(shù)據(jù)集構(gòu)建 ML 模型。這要求在應(yīng)用模型構(gòu)建代碼之前將數(shù)據(jù)聚合在一處。這限制了 Netezza 并行運(yùn)行事物的范圍。這不是最佳方案。
  3. 假設(shè)你的數(shù)據(jù)可以分區(qū),你的目標(biāo)是為每個(gè)分區(qū)構(gòu)建 ML 模型,每個(gè)分區(qū)都是一個(gè)獨(dú)立的數(shù)據(jù)集。在應(yīng)用模型構(gòu)建例程之前,這不需要將數(shù)據(jù)聚合在一處。你可以通過(guò)為每個(gè)分區(qū)并行構(gòu)建模型來(lái)利用 Netezza 并行性。這是一個(gè)最佳方案。
  4. 假設(shè)你需要探索數(shù)據(jù),例如收集數(shù)據(jù)集的統(tǒng)計(jì)信息。這要求在應(yīng)用例程之前將數(shù)據(jù)聚合在一處,并限制 Netezza 并行運(yùn)行事物的范圍。這不是最佳方案。然而,鑒于這是一項(xiàng)常見(jiàn)任務(wù),我們已經(jīng)為大多數(shù)使用 Pandas 數(shù)據(jù)幀抽象的數(shù)據(jù)探索操作提供了數(shù)據(jù)庫(kù)內(nèi) SQL 實(shí)現(xiàn),用戶(hù)可以從中受益。

安裝

  1. 在客戶(hù)端(在 Python 環(huán)境中):使用?pip install ibmdbpy4nps?.
  2. 在服務(wù)器端(Netezza 服務(wù)器):安裝從 11.2.1.0 開(kāi)始的任何 INZA 版本:INZA 自定義(AE 啟動(dòng)器、模板、預(yù)注冊(cè)的 SQL 函數(shù)等)在 11.2.1.0 開(kāi)始的 INZA 版本中可用以支持 ibmdbpy4nps 包調(diào)用。
  3. 在客戶(hù)端設(shè)置 ODBC 或 JDBC 連接

ODBC 連接:按照安裝和配置 ODBC 部分中的步驟設(shè)置和配置 ODBC 數(shù)據(jù)源連接。

例如,下圖(在 Windows 中)顯示了為“weather”數(shù)據(jù)庫(kù)設(shè)置的數(shù)據(jù)源“weather”。 

流動(dòng)

JDBC 連接:IBM 知識(shí)中心在安裝和配置 JDBC部分中描述了如何在你的客戶(hù)端上安裝 Netezza JDBC 驅(qū)動(dòng)程序。下載并安裝文件 ?nzjdbc3.jar? 后,你必須將其位置包含在 CLASSPATH 環(huán)境變量的值中:

export CLASSPATH=<path-to-nzjdbc3.jar>:$CLASSPATH

連接到數(shù)據(jù)庫(kù)

用戶(hù)可以訪問(wèn)具有數(shù)據(jù)框抽象的數(shù)據(jù)庫(kù)表,如下所示:

導(dǎo)入包

from ibmdbpy4nps, `import IdaDataBase, IdaDataFrame

連接數(shù)據(jù)庫(kù)(weather是數(shù)據(jù)源名稱(chēng))

idadb = IdaDataBase('weather', 'admin', 'password')

連接到數(shù)據(jù)庫(kù)內(nèi)的表(WEATHER 是數(shù)據(jù)庫(kù)表名)

idadf = IdaDataFrame(idadb, 'WEATHER')

注意:以下是將天氣 CSV加載到 Netezza 服務(wù)器上的天氣表中的步驟。

  1. 創(chuàng)建一個(gè)表:
  2. create table weather (date Date, Location VARCHAR(100), MinTemp REAL, MaxTemp REAL, Rainfall REAL, Evaporation REAL,Sunshine REAL, WindGustDir VARCHAR(20), WindGustSpeed INTEGER, WindDir9am VARCHAR(10), WindDir3pm VARCHAR(10), WindSpeed9am INTEGER, WindSpeed3pm INTEGER, Humidity9am INTEGER, Humidity3pm INTEGER, Pressure9am REAL, Pressure3pm REAL, cloud9am VARCHAR(10), cloud3pm VARCHAR(10), Temp9am REAL, Temp3pm REAL, RainToday VARCHAR(10), RISK_MM REAL, RainTomorrow VARCHAR(10));
  3. 將 CSV 加載到表中:
  4. nzload -df weatherAUS.csv -t weather -db weather -pw password -nullValue NA -boolStyle Yes_No -skipRows 1 -delim , -dateStyle MDY -dateDelim '/' 
  5. 對(duì)于沒(méi)有標(biāo)識(shí)列的表(如上面的天氣表),創(chuàng)建一個(gè)標(biāo)識(shí)列并將值設(shè)置為rowid: 
  6.  alter table weather add column id bigint; update weather set id=rowid; 

使用內(nèi)置 SQL 翻譯探索數(shù)據(jù)

本節(jié)對(duì)應(yīng)于第四個(gè)用例,其中用戶(hù)有興趣探索或收集有關(guān)數(shù)據(jù)的一些統(tǒng)計(jì)信息。ibmdbpy4nps 包構(gòu)建在 ibmdbpy 包之上,并支持多個(gè)內(nèi)置的 Pandas 風(fēng)格的數(shù)據(jù)幀操作。下面顯示了一些示例。

?idadf.head()?— 返回前n行

數(shù)字

?idadf.describe()? — 返回列的各種統(tǒng)計(jì)信息

數(shù)字

?idadf.corr()? — 返回列的成對(duì)相關(guān)性

數(shù)字

上面的代碼片段使用相關(guān)值生成可視化熱圖(就像本地 pandas 一樣df.corr())。

在數(shù)據(jù)庫(kù)內(nèi)執(zhí)行自定義分析/機(jī)器學(xué)習(xí)功能

在本節(jié)中,你將看到如何在數(shù)據(jù)庫(kù)中執(zhí)行自定義 ML 代碼(用例 1-3)。

NZInstall – 在 Netezza 上安裝軟件包

實(shí)用程序開(kāi)發(fā)人員需要的一項(xiàng)重要功能是能夠在其 ML 函數(shù)中使用 Python 包之前安裝它們。NZInstall 對(duì)此有所幫助。它接受一個(gè)包名并返回一個(gè)輸出代碼來(lái)指示是否安裝了包。輸出 0 表示軟件包已成功安裝在 Netezza 上。

from ibmdbpy4nps.ae.install import NZInstall
# specify the package_name depending on your requirement
package_name=’pandas’ 
idadb = IdaDataBase('weather', 'admin', 'password')
nzinstall = NZInstall(idadb, package_name)
result = nzinstall.getResultCode()

NZFunApply

對(duì)表數(shù)據(jù)的每一行應(yīng)用函數(shù)。這對(duì)應(yīng)于第一個(gè)用例,其中用戶(hù)有興趣在數(shù)據(jù)庫(kù)表的每條記錄上應(yīng)用自定義數(shù)據(jù)轉(zhuǎn)換。

示例場(chǎng)景:將天氣表中的 MAXTEMP 列從攝氏度轉(zhuǎn)換為華氏度

默認(rèn)情況下self,你要執(zhí)行的用戶(hù)函數(shù)可以采用兩個(gè)參數(shù):,表示分析可執(zhí)行上下文,以及x,表示表的行。你可以使用x對(duì)選定的列進(jìn)行操作。在下面的示例中,x[2]檢索并進(jìn)一步處理第三列 ( ) 以生成新值。對(duì)于你希望生成為輸出的列,將它們構(gòu)建為列表并用于?self.output?填充結(jié)果?;蛘?,如果你只有一個(gè)結(jié)果,你可以直接將其發(fā)送到?self.output?。

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')


def apply_fun(self, x):
            from math import sqrt
            max_temp = x[3]
            id = x[24]
            fahren_max_temp = (max_temp*1.8)+32
            row = [id, max_temp,  fahren_max_temp]
            self.output(row)

output_signature = {'ID': 'int', 'MAX_TEMP': 'float', 'FAHREN_MAX_TEMP': 'float'}
nz_apply = NZFunApply(df=idadf, fun_ref = code_str_apply, output_table="temp_conversion",output_signature=output_signature, merge_output_with_df=True)
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

在無(wú)法將函數(shù)作為引用發(fā)送的筆記本環(huán)境中,請(qǐng)將函數(shù)用引號(hào)括起來(lái)并將其分配給字符串變量。如果你的函數(shù)代碼以字符串形式發(fā)送,則你還必須在NZApply調(diào)用中提及函數(shù)名稱(chēng)。下面提供了一個(gè)示例。

注意:要為后端 SQL 函數(shù)生成縮進(jìn)的分析可執(zhí)行代碼,你需要在引號(hào)后立即使用函數(shù)名稱(chēng),但不要在下一行。

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_apply = """def apply_fun(self, x):
    from math import sqrt
    max_temp = x[3]
    id = x[24]
    fahren_max_temp = (max_temp*1.8)+32
    row = [id, max_temp,  fahren_max_temp]
    self.output(row)
    """
output_signature = {'ID': 'int', 'MAX_TEMP': 'float', 'FAHREN_MAX_TEMP': 'float'}
nz_apply = NZFunApply(df=idadf, code_str=code_str_apply, fun_name='apply_fun', output_table="temp_conversion",output_signature=output_signature, merge_output_with_df=True)
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

預(yù)期結(jié)果(142193 行 x 27 列):

最高溫度 FAHREN_MAX_TEMP 日期 風(fēng)險(xiǎn)_MM 明天下雨 ID
0 27.700001 81.860001 2008-12-16 0.0 448014
1 33.000000 91.400002 2008-12-22 0.0 448020
2 32.700001 90.860001 2008-12-28 0.0 448026
3 28.799999 83.839996 2009-01-03 0.0 448032
4 28.400000 83.120003 2009-01-09 0.0 448038
142188 32.900002 91.220001 2015-12-02 0.0 589631
142189 37.099998 98.779999 2015-12-08 0.2 589637
142190 39.500000 103.099998 2015-12-14 3.8 是的 589643
142191 30.299999 86.540001 2015-12-20 4.8 是的 589649
142192 36.299999 97.339996 2016-02-06 0.0 589692

NZFunT申請(qǐng)

這對(duì)應(yīng)于用戶(hù)有興趣執(zhí)行復(fù)雜功能的第二個(gè)用例——例如,針對(duì)整個(gè)可用行構(gòu)建 ML 模型。這將導(dǎo)致我們?cè)谝韵虏糠种忻枋龅膬煞N變體,針對(duì)切片數(shù)據(jù)構(gòu)建模型或針對(duì)整個(gè)表數(shù)據(jù)構(gòu)建模型。

a) 對(duì)數(shù)據(jù)的每個(gè)切片應(yīng)用該函數(shù)

數(shù)據(jù)切片是表數(shù)據(jù)的一部分。Netezza 根據(jù)創(chuàng)建表時(shí)指定的列將數(shù)據(jù)分配到不同的切片。如果未指定列,則第一列將被視為分布列。因此,在使用NZFunTApply. 否則,你可能會(huì)在將函數(shù)應(yīng)用于你不想要的切片時(shí)產(chǎn)生意外結(jié)果。

示例場(chǎng)景:轉(zhuǎn)換數(shù)據(jù),構(gòu)建 ML 模型,并測(cè)量模型的準(zhǔn)確性

讓我們使用天氣數(shù)據(jù)集編寫(xiě)一個(gè)函數(shù)來(lái)解決上述情況。天氣數(shù)據(jù)集包含來(lái)自澳大利亞多個(gè)地點(diǎn)的 10 年每日天氣觀測(cè)數(shù)據(jù)。數(shù)據(jù)集中有 48 個(gè)唯一位置,并且RainTomorrow是要預(yù)測(cè)的目標(biāo)變量(第二天下雨了嗎?)。次日降雨預(yù)測(cè)是通過(guò)對(duì)目標(biāo)變量 訓(xùn)練分類(lèi)模型來(lái)完成的RainTomorrow。我們的目標(biāo)是編寫(xiě)一個(gè)函數(shù)來(lái)執(zhí)行三個(gè)步驟:轉(zhuǎn)換數(shù)據(jù)(通過(guò)為空值分配默認(rèn)值來(lái)估算列),構(gòu)建 ML 模型(為轉(zhuǎn)換后的數(shù)據(jù)構(gòu)建決策樹(shù)分類(lèi)器),然后測(cè)量準(zhǔn)確性(計(jì)算三倍的 CV 準(zhǔn)確度分?jǐn)?shù))。

請(qǐng)注意,該函數(shù)默認(rèn)獲取兩個(gè)參數(shù):(self表示分析可執(zhí)行上下文)和df(傳入切片數(shù)據(jù)的數(shù)據(jù)幀)。使用?self.output?打印數(shù)據(jù)集的結(jié)果大小、第一條記錄的位置和準(zhǔn)確度。

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunTApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_host_spus="""def decision_tree_ml(self, df):
    from sklearn.model_selection import cross_val_score
    from sklearn.impute import SimpleImputer
    from sklearn.tree import DecisionTreeClassifier

    from sklearn.preprocessing import LabelEncoder
    import numpy as np

    location = df.LOCATION[0]

    # data preparation
    imputed_df = df.copy()
    ds_size = len(imputed_df)
    imputed_df['CLOUD9AM'] = imputed_df.CLOUD9AM.astype('str')
    imputed_df['CLOUD3PM'] = imputed_df.CLOUD3PM.astype('str')
    imputed_df['SUNSHINE'] = imputed_df.SUNSHINE.astype('float')
    imputed_df['EVAPORATION'] = imputed_df.EVAPORATION.astype('float')


    #remove columns which have only null values
    columns = imputed_df.columns
    for column in columns:
        if imputed_df[column].isnull().sum()==len(imputed_df):
            imputed_df=imputed_df.drop(column, 1)

    columns = imputed_df.columns

    for column in columns:

        if (imputed_df[column].dtype == 'float64' or imputed_df[column].dtype == 'int64'):
            imp = SimpleImputer(missing_values=np.nan, strategy='mean')
            imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))

        if (imputed_df[column].dtype == 'object'):
            # impute missing values for categorical variables
            imp = SimpleImputer(missing_values=None, strategy='constant', fill_value='missing')
            imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))
            imputed_df[column] = imputed_df[column].astype('str')
            le = LabelEncoder()
            #print(imputed_df[column].unique())

            le.fit(imputed_df[column].unique())
            # print(le.classes_)
            imputed_df[column] = le.transform(imputed_df[column])



    X = imputed_df.drop(['RISK_MM', 'RAINTOMORROW'], axis=1)
    y = imputed_df['RAINTOMORROW']

    # Create a decision tree
    dt = DecisionTreeClassifier(max_depth=5)

    cvscores_3 = cross_val_score(dt, X, y, cv=3)

    self.output(ds_size, location, np.mean(cvscores_3))
"""

由于我們要在每個(gè)數(shù)據(jù)切片上應(yīng)用該函數(shù),所以我們將?parallel=True?在模塊調(diào)用中進(jìn)行選擇。

output_signature = {'DATASET_SIZE': 'int', 'LOCATION':'str', 'CLASSIFIER_ACCURACY':'double'}

nz_fun_tapply = NZFunTApply(df=idadf, code_str=code_str_host_spus, fun_name ="decision_tree_ml", parallel=True, output_signature=output_signature)
result = nz_fun_tapply.get_result()
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

請(qǐng)注意,我們?cè)诮Y(jié)果中有多行(對(duì)應(yīng)于創(chuàng)建表時(shí)分配的數(shù)據(jù)切片的數(shù)量)。第一列是數(shù)據(jù)集的大小,第二列是數(shù)據(jù)集中第一條記錄的位置,第三列是構(gòu)建的分類(lèi)器的準(zhǔn)確率。

數(shù)據(jù)集_大小 地點(diǎn) 分類(lèi)器_準(zhǔn)確度
0 23673 Albury 0.824822
1 23734 Albury
0.827126
2 23686 Albury
0.813898
3 23706 Albury
0.818485
4 23739 Albury
0.832175
5 23655 Albury
0.826168

b) 應(yīng)用于數(shù)據(jù)集的函數(shù)

您可能希望將該函數(shù)應(yīng)用于整個(gè)數(shù)據(jù)集而不是切片。這意味著在應(yīng)用該功能之前,需要將數(shù)據(jù)聚合到一個(gè)地方。這不是最佳方案,但parallel=False為了完整起見(jiàn),我們確實(shí)提供了此選項(xiàng)(使用)。

output_signature = {'DATASET_SIZE': 'int', 'LOCATION':'str', 'CLASSIFIER_ACCURACY':'double'}
nz_fun_tapply = NZFunTApply(df=idadf, code_str=code_str_host_spus, fun_name ="decision_tree_ml", parallel=False, output_signature=output_signature)
result = nz_fun_tapply.get_result()
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

因?yàn)槲覀冞x擇?parallel=False?在完整數(shù)據(jù)集上執(zhí)行該函數(shù),所以你只能在結(jié)果中看到一行。第一列是數(shù)據(jù)集的大小,第二列是數(shù)據(jù)集中第一條記錄的位置,第三列是構(gòu)建的分類(lèi)器的準(zhǔn)確率。

數(shù)據(jù)集_大小 地點(diǎn) 分類(lèi)器_準(zhǔn)確度
142193 Albury
0.827115

注意:對(duì)于復(fù)雜的功能,最好先在客戶(hù)端測(cè)試,然后再在服務(wù)器上運(yùn)行??梢允褂眠x擇查詢(xún)(?select * from weather limit 1000?)下載一個(gè)小的子集(比如 1,000 條記錄),在客戶(hù)端數(shù)據(jù)幀上測(cè)試該函數(shù),然后將該函數(shù)推送到服務(wù)器以針對(duì)整個(gè)數(shù)據(jù)集執(zhí)行。

NZFunGroupedApply

根據(jù)用戶(hù)的選擇在運(yùn)行時(shí)計(jì)算的每個(gè)分區(qū)上應(yīng)用該函數(shù)。

本節(jié)對(duì)應(yīng)于第四個(gè)用例,其中用戶(hù)有興趣執(zhí)行復(fù)雜的功能,例如為他選擇的分區(qū)構(gòu)建 ML 模型。

在這種情況下,你的函數(shù)將應(yīng)用于運(yùn)行你提供給?NZFunGroupedApply?的輸入列(索引參數(shù))計(jì)算的每個(gè)分區(qū)。Netezza 數(shù)據(jù)切片將在運(yùn)行時(shí)重新生成,以便每個(gè)切片僅包含一組或多組指定的列。由于這不需要在應(yīng)用該函數(shù)之前將數(shù)據(jù)聚合在一處,因此這是一種最佳方案。此外,現(xiàn)實(shí)世界的 ML 設(shè)置需要在定義組時(shí)進(jìn)行控制。因此,這是利用 Netezza 并行性處理復(fù)雜 ML 功能的最推薦選項(xiàng)。每個(gè)組/分區(qū)都被視為獨(dú)立的數(shù)據(jù)集,并且針對(duì)這些分區(qū)并行執(zhí)行該函數(shù)。

雖然?NZFunTApply?也將函數(shù)應(yīng)用于并行數(shù)據(jù)切片 ( ?parallel=True?) 選項(xiàng),但有一些區(qū)別:

  • ?NZFunTApply ?使用靜態(tài)切片,這意味著這些切片可能不是你的場(chǎng)景所需的數(shù)據(jù)排列。
  • 該函數(shù)是針對(duì)整個(gè)切片執(zhí)行的,而不是針對(duì)切片中的組執(zhí)行的。
示例場(chǎng)景:轉(zhuǎn)換數(shù)據(jù)、構(gòu)建 ML 模型并為模型評(píng)分

讓我們?cè)俅问褂锰鞖鈹?shù)據(jù)集來(lái)編寫(xiě)解決上述情況的函數(shù)。請(qǐng)記住,數(shù)據(jù)集中有 48 個(gè)唯一位置(產(chǎn)生 48 個(gè)分區(qū))并且?RainTomorrow?是要預(yù)測(cè)的目標(biāo)變量。我們的目標(biāo)是編寫(xiě)一個(gè)函數(shù)來(lái)為每個(gè)分區(qū)執(zhí)行三個(gè)步驟:轉(zhuǎn)換數(shù)據(jù)(通過(guò)為空值分配默認(rèn)值來(lái)估算列),構(gòu)建 ML 模型(為轉(zhuǎn)換后的數(shù)據(jù)構(gòu)建決策樹(shù)分類(lèi)器),然后評(píng)分模型(預(yù)測(cè)?RAINTOMORROW?的值)。

請(qǐng)注意,該函數(shù)默認(rèn)獲取兩個(gè)參數(shù):self,代表分析可執(zhí)行上下文)和df(傳入切片數(shù)據(jù)的數(shù)據(jù)幀);結(jié)果 ID、數(shù)據(jù)集大小、第一條記錄的位置列值和預(yù)測(cè)值用 打印?self.output?。

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunGroupedApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_host_spus="""def decision_tree_ml(self, df):
            from sklearn.model_selection import cross_val_score
            from sklearn.impute import SimpleImputer
            from sklearn.tree import DecisionTreeClassifier
            from sklearn.model_selection import train_test_split

            from sklearn.preprocessing import LabelEncoder
            import numpy as np



            # data preparation
            imputed_df = df.copy()
            ds_size = len(imputed_df)
            temp_dict = dict()


            columns = imputed_df.columns

            for column in columns:
                if column=='ID':
                    continue

                if (imputed_df[column].dtype == 'float64' or imputed_df[column].dtype == 'int64'):
                  if imputed_df[column].isnull().sum()==len(imputed_df):
                     imputed_df[column] = imputed_df[column].fillna(0)

                  else :

                     imp = SimpleImputer(missing_values=np.nan, strategy='mean')
                     transformed_column = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))         
                     imputed_df[column] = transformed_column

                if (imputed_df[column].dtype == 'object'):
                    # impute missing values for categorical variables
                    imp = SimpleImputer(missing_values=None, strategy='constant', fill_value='missing')
                    imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))
                    imputed_df[column] = imputed_df[column].astype('str')
                    le = LabelEncoder()

                    le.fit(imputed_df[column])
                    # print(le.classes_)
                    imputed_df[column] = le.transform(imputed_df[column])
                    temp_dict[column] = le



            # Create a decision tree
            dt = DecisionTreeClassifier(max_depth=5)
            X = imputed_df.drop(['RISK_MM', 'RAINTOMORROW'], axis=1)
            y = imputed_df['RAINTOMORROW']
            X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.25, random_state=42, stratify=y)


            dt.fit(X_train, y_train)

            accuracy = dt.score(X_test, y_test)    

            pred_df = X_test.copy()

            y_pred= dt.predict(X_test)

            pred_df['RAINTOMORROW'] = y_pred
            pred_df['DATASET_SIZE'] = ds_size
            pred_df['CLASSIFIER_ACCURACY']=round(accuracy,2)

            original_columns = pred_df.columns

            for column in original_columns:

             if column in temp_dict:   
               pred_df[column] = temp_dict[column].inverse_transform(pred_df[column])
               #print(pred_df)

            def print_output(x):
                row = [x['ID'], x['RAINTOMORROW'], x['DATASET_SIZE'], x['CLASSIFIER_ACCURACY']]
                self.output(row)


            pred_df.apply(print_output, axis=1)


"""


output_signature = {'ID':'int', 'RAINTOMORROW_PRED' :'str',  'DATASET_SIZE':'int', 'CLASSIFIER_ACCURACY':'float'}

nz_groupapply = NZFunGroupedApply(df=idadf,  code_str=code_str_host_spus, index='LOCATION', fun_name="decision_tree_ml", output_signature=output_signature, merge_output_with_df=True)

result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

你應(yīng)該會(huì)看到如下所示的結(jié)果。請(qǐng)注意,結(jié)果列已通過(guò)?merge_output_with_df=True?選項(xiàng)與原始 df 列合并。

數(shù)字

Netezza安裝

在 Netezza 上安裝軟件包。實(shí)用程序開(kāi)發(fā)人員需要的一項(xiàng)重要功能是能夠在其 ML 函數(shù)中使用 Python 包之前安裝它們。NZInstall對(duì)此有幫助。它接受一個(gè)包名并返回一個(gè)輸出代碼來(lái)指示是否安裝了包。輸出 0 表示軟件包已成功安裝在 Netezza 上。

例子:

from ibmdbpy4nps.ae.install import NZInstall
idadb = IdaDataBase('weather', 'admin', 'password')
nzinstall = NZInstall(idadb, package_name)
result = nzinstall.getResultCode()

結(jié)論

在本文中,我們展示了如何將自定義 ML 推送到 Netezza(無(wú)論是內(nèi)部部署還是云版本)。如示例中所示,使用 ibmdbpy4nps,用戶(hù)將能夠在數(shù)據(jù)庫(kù)中無(wú)縫運(yùn)行他們的自定義代碼,就像在他們最喜歡的 IDE 或 Notebook 環(huán)境中運(yùn)行一樣。這為用戶(hù)提供了數(shù)據(jù)庫(kù)內(nèi)分析的性能、不將數(shù)據(jù)移出數(shù)據(jù)庫(kù)的便利性以及編寫(xiě)自定義 Python 函數(shù)的靈活性。我們得出的結(jié)論是,如果出現(xiàn)以下情況,用戶(hù)應(yīng)該考慮針對(duì)他們的用例使用下推方法:

  • 數(shù)據(jù)有分區(qū),如果每個(gè)分區(qū)都被視為一個(gè)獨(dú)立的數(shù)據(jù)集。
  • 模型構(gòu)建或分析需要在此類(lèi)分區(qū)數(shù)據(jù)集上并行執(zhí)行。
  • 分區(qū)計(jì)數(shù)足夠高(大于或等于 Netezza 配置中的工作節(jié)點(diǎn)數(shù))以利用并行性。


0 人點(diǎn)贊