本文的主題:

1 - 泛型函數 (Generic Function) 存在的必要性 2 - 一則泛型函數的簡例

3 - 全局函數

1 - 泛型函數 (Generic Function) 存在的必要性

泛型函數 (Generic Function) 存在的意義,解決了運行時參數類型多變,而標準函數無法一一匹配的情況。以判斷某變數是否為 Null 而賦予不同默認值為例。程序不可能做到對每種類型都做這樣的判斷,這樣將需要重寫很多方法,而泛型則很好解決了該問題

2 - 一則泛型函數的簡例

package hive.function.generic;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector ;

@Description(
name = "nvl",
value = "_FUNC_(value,default_value) - Returns default value " +
" if value is null else returns value",
extended= "Example:
" +
">SELECT _FUNC_(null,bla) FROM src LIMIT 1;
"
)

public class genericNvl extends GenericUDF {

private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver ;
private ObjectInspector[] argumentOIs ;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException{
argumentOIs = arguments ;
if (arguments.length !=2 ) {
throw new UDFArgumentLengthException(
"The operator NVL accepts 2 arguments.");
}

returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
if(!(returnOIResolver.update(arguments[0])&&returnOIResolver.update(arguments[1]))) {
throw new UDFArgumentTypeException(
2,"The 1st and 2nd args of function NLV should have the same type,"+
" but they are different: "" + arguments[0].getTypeName() +
" " and "" + arguments[1].getTypeName() + """);
}

return returnOIResolver.get();
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException{
Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(),argumentOIs[0]);
if (retVal == null) {
retVal = returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);
}
return retVal ;
}

@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("if ");
sb.append(children[0]);
sb.append(" is null ");
sb.append(" returns ");
sb.append(children[1]);
return sb.toString();
}
}

returnOIResolver.update 起到的作用是判斷兩個參數是否能轉換

/**
* Update returnObjectInspector and valueInspectorsAreTheSame based on the
* ObjectInspector seen.
*
* @return false if there is a type mismatch
*/
private boolean update(ObjectInspector oi, boolean isUnionAll) throws UDFArgumentTypeException {
if (oi instanceof VoidObjectInspector) {
return true;
}

if (returnObjectInspector == null) {
// The first argument, just set the return to be the standard
// writable version of this OI.
returnObjectInspector = ObjectInspectorUtils
.getStandardObjectInspector(oi,
ObjectInspectorCopyOption.WRITABLE);
return true;
}

if (returnObjectInspector == oi) {
// The new ObjectInspector is the same as the old one, directly return
// true
return true;
}

TypeInfo oiTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(oi);
TypeInfo rTypeInfo = TypeInfoUtils
.getTypeInfoFromObjectInspector(returnObjectInspector);
if (oiTypeInfo == rTypeInfo) {
// Convert everything to writable, if types of arguments are the same,
// but ObjectInspectors are different.
returnObjectInspector = ObjectInspectorUtils
.getStandardObjectInspector(returnObjectInspector,
ObjectInspectorCopyOption.WRITABLE);
return true;
}

if (!allowTypeConversion) {
return false;
}

// Types are different, we need to check whether we can convert them to
// a common base class or not.
TypeInfo commonTypeInfo = null;
if (isUnionAll) {
commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(rTypeInfo, oiTypeInfo);
} else {
commonTypeInfo = FunctionRegistry.getCommonClass(oiTypeInfo,
rTypeInfo);
}
if (commonTypeInfo == null) {
return false;
}

commonTypeInfo = updateCommonTypeForDecimal(commonTypeInfo, oiTypeInfo, rTypeInfo);

returnObjectInspector = TypeInfoUtils
.getStandardWritableObjectInspectorFromTypeInfo(commonTypeInfo);

return true;
}

除了 initialize 方法,GenericUDF 子類還需要重寫其他兩個方法,即 evaluate 和 getDisplayString.

3 - 全局函數

在添加臨時自定義函數時,引用 Jar 包中定義的類名,而不是包名,如下:

hive> add jar /home/SparkAdmin/HiveFunctions/Nvl.jar
> ;
Added [/home/SparkAdmin/HiveFunctions/Nvl.jar] to class path
Added resources: [/home/SparkAdmin/HiveFunctions/Nvl.jar]
hive> create temporary function NullReplace as hive.function.generic.Nvl ;
FAILED: Class hive.function.generic.Nvl not found
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask
hive> create temporary function NullReplace as hive.function.generic.genericNvl ;
OK

3.1 -使用泛型函數:

初始化帶 Null 值的數據:

hive> insert into default.employee(name,salary,subordinates,deductions,address)
> select null,null,subordinates,deductions,address from default.employee
> limit 10 ;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = SparkAdmin_20181124142056_7af103f3-95de-4d42-9b64-77337ad06734
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2018-11-24 14:20:59,351 Stage-1 map = 100%, reduce = 0%
2018-11-24 14:21:00,368 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local362424371_0001
Loading data to table default.employee
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 50910 HDFS Write: 298 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 4.982 seconds
hive> select * from default.employee ;
OK
ali 320.0 ["ali","acai","ayun"] {"ali":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
liton 345.0 ["liton","acai","ayun"] {"liton":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
tencent 543.0 ["tencent","acai","ayun"] {"tencent":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL NULL ["tencent","acai","ayun"] {"tencent":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL NULL ["liton","acai","ayun"] {"liton":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
NULL NULL ["ali","acai","ayun"] {"ali":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}
Time taken: 0.115 seconds, Fetched: 6 row(s)
hive>

null 替換:

hive> select nullreplace(salary,0) as salary from default.employee ;
OK
320.0
345.0
543.0
0.0
0.0
0.0
Time taken: 0.109 seconds, Fetched: 6 row(s)

即使 2 個參數明面上不是同一個類型,但最終還是相互轉換了:

hive> select nullreplace(salary,"end") as salary from default.employee ;
OK
320.0
345.0
543.0
end
end
end
Time taken: 0.1 seconds, Fetched: 6 row(s)
hive>

但如果不能像數字與字元之間進行隱式轉換,就會報錯了:

hive> select nullreplace(salary,array("em","bm","fm")) as salary from default.employee ;
FAILED: NullPointerException null

3.2 - 函數全局可用

自定義函數的調用,是臨時的。當關閉當前會話或重開會話時,函數就不能被調用了。

hive> select nullreplace(name,"end") as name from default.name ;
FAILED: SemanticException [Error 10011]: Invalid function nullreplace

實現所有會話都能調用自定義函數,簡單直接的方法就是配置 ~/.hiverc (runtime configuration) 文件,在會話開始就定義好要用的自定義函數。

修改 ~/.hiverc 文件:

[SparkAdmin@centos00 bin]$ vi ~/.hiverc
add jar /home/SparkAdmin/HiveFunctions/Nvl.jar;
create temporary function NullReplace as hive.function.generic.genericNvl;
~

Create Function 建立全局函數

.hiverc 配置方式放在大型的項目中,複雜了應用,所以 Hive 新版中直接使用 create function 就可以將自定義函數的生存周期放到全局,本質上是將定義的函數存儲在了 metaData store 裡面

hive> create function nullreplace2 as hive.function.generic.genericNvl using jar /home/SparkAdmin/HiveFunctions/Nvl.jar ;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask. Hive warehouse is non-local, but /home/SparkAdmin/HiveFunctions/Nvl.jar specifies file on local filesystem. Resources on non-local warehouse should specify a non-local scheme/path
hive>

解決方法:

[SparkAdmin@centos00 conf]$ hdfs dfs -copyFromLocal /home/SparkAdmin/HiveFunctions/Nvl.jar /user/hive/warehouse
[SparkAdmin@centos00 conf]$ hdfs dfs -ls /user/hive/warehouse
Found 5 items
-rw-r--r-- 3 SparkAdmin supergroup 1798 2018-11-24 20:41 /user/hive/warehouse/Nvl.jar
drwxr-xr-x - SparkAdmin supergroup 0 2018-11-05 22:04 /user/hive/warehouse/account
drwxr-xr-x - SparkAdmin supergroup 0 2018-11-09 23:03 /user/hive/warehouse/crm.db
drwxr-xr-x - SparkAdmin supergroup 0 2018-11-24 14:21 /user/hive/warehouse/employee
drwxr-xr-x - SparkAdmin supergroup 0 2018-10-31 16:17 /user/hive/warehouse/student
[SparkAdmin@centos00 conf]$

接著創建函數:

hive> create function nullreplace2 as hive.function.generic.genericNvl using jar hdfs:///user/hive/warehouse/Nvl.jar ;
Added [/tmp/06ebd574-bcbc-4146-bc39-f195b8d0c9c2_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
Time taken: 0.814 seconds
hive> select nullreplace2(name,"end") as name from default.employee ;
OK
ali
liton
tencent
end
end
end
Time taken: 1.93 seconds, Fetched: 6 row(s)
hive>

如果整個開發組中,有部分開發人員使用 hive 命令行,而另外部分開發使用了 oracle sql developer,如何讓自定義函數在全組開發人員中共享呢?

答案是創建全局函數。

就如前面從 hdfs 的 Jar 包中調用函數一樣,在 oracle sql developer 中創建一個全局函數:

create function nullReplace_osd as hive.function.generic.genericNvl using jar hdfs:///user/hive/warehouse/Nvl.jar

打開 Hive 命令行,調用 oracle sql developer 中創建的函數 nullReplace_osd 即可:

hive> select default.nullReplace_osd(name,"end") as name from default.employee ;
Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
ali
liton
tencent
end
end
end
Time taken: 1.747 seconds, Fetched: 6 row(s)
hive>

同理,在 Hive 命令行中創建的全局自定義函數,也可以在 oracle sql developer 中調用:

hive> create function NullReplace_hcmd as hive.function.generic.genericNvl using jar hdfs:///user/hive/warehouse/Nvl.jar ;
Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]
OK
Time taken: 0.047 seconds
hive> select NullReplace_hcmd(name,"end") as name from default.employee;
OK
ali
liton
tencent
end
end
end
Time taken: 0.146 seconds, Fetched: 6 row(s)
hive>

如果 oracle sql developer 打開則重啟,然後調用 hive 命令行創建的全局自定義函數:

執行調用函數:

select default.NullReplace_hcmd2(name,"end") as name from default.employee;

在行: 6 上開始執行命令時出錯 -
select default.NullReplace_hcmd2(name,"end") as name from default.employee
錯誤位於命令行: 6 列: 1
錯誤報告 -
SQL 錯誤: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 10011, SQL state: TStatus(statusCode:ERROR_STATUS, infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2:17:16, org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:380, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:206, org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290, org.apache.hive.service.cli.operation.Operation:run:Operation.java:320, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:530, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementAsync:HiveSessionImpl.java:517, org.apache.hive.service.cli.CLIService:executeStatementAsync:CLIService.java:310, org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:530, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1437, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1422, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745, *org.apache.hadoop.hive.ql.parse.SemanticException:Invalid function default.NullReplace_hcmd2:28:12, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAllAggregations:SemanticAnalyzer.java:636, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAggregationsFromSelect:SemanticAnalyzer.java:558, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1464, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:genResolvedParseTree:SemanticAnalyzer.java:11072, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:analyzeInternal:SemanticAnalyzer.java:11133, org.apache.hadoop.hive.ql.parse.CalcitePlanner:analyzeInternal:CalcitePlanner.java:286, org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer:analyze:BaseSemanticAnalyzer.java:258, org.apache.hadoop.hive.ql.Driver:compile:Driver.java:512, org.apache.hadoop.hive.ql.Driver:compileInternal:Driver.java:1317, org.apache.hadoop.hive.ql.Driver:compileAndRespond:Driver.java:1295, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:204], sqlState:42000, errorCode:10011, errorMessage:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2), Query: select default.NullReplace_hcmd2(name,"end") as name from default.employee.

查詢 metaData store 資料庫,不難發現函數是全部創建成功了,但許可權問題隔離了用戶訪問許可權:

SELECT TOP (1000) [FUNC_ID]
,[CLASS_NAME]
,[CREATE_TIME]
,[DB_ID]
,[FUNC_NAME]
,[FUNC_TYPE]
,[OWNER_NAME]
,[OWNER_TYPE]
FROM [metadata].[dbo].[FUNCS]

Hive 的許可權問題,另開一章講。

重新編譯 Hive

當有十足的把握和復用的必要,提交自定義函數,重新編譯 Hive ,是解決覆蓋率和及時性的慣用方法。但缼點也很明顯,容易造成系統不穩定。所以 Hive 開發小組才有了 Create Function 即可全局使用函數這個補救措施。


推薦閱讀:
相关文章