Agregación de archivos JAR a Athena para Apache Spark
Puede importar archivos JAR que implementen UDF (funciones definidas por el usuario) personalizadas a Athena para Apache Spark.
No puede usar archivos JAR para agregar nuevos conectores o nuevos formatos de tabla, como Apache Iceberg, a una sesión de Spark en Athena. La sesión de Spark debe configurarse con el archivo JAR cuando se inicie la sesión y Athena no admite la agregación de archivos JAR al inicio de una sesión de Spark.
Agregación de archivos JAR y registro de una UDF
El siguiente procedimiento muestra cómo agregar un archivo JAR de Amazon S3 en su cuaderno de Athena y cómo registrar una UDF.
Agregar un archivo JAR y registrar una función definida por el usuario
-
Inserte el siguiente código Java de ejemplo en un archivo llamado
custom-UDF1.jar
. El código implementa una función definida por el usuario llamadacustomUDF1
que invierte las cadenas de texto que se le pasan.package pkg.customUDF1; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.Text; import java.util.Arrays; import java.util.Objects; import java.util.stream.Collectors; public class UDFClass1 extends GenericUDF { @Override public org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector initialize(org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector[] objectInspectors) { if (objectInspectors.length != 1) { throw new RuntimeException("argument length must be 1"); } if (objectInspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new IllegalArgumentException("input type should be primitive"); } return PrimitiveObjectInspectorFactory.writableStringObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { StringBuilder sb = new StringBuilder(); sb.append(deferredObjects[0].get()).reverse(); String str = sb.toString(); return new Text(str); } @Override public String getDisplayString(String[] strings) { return "customUDF1"; } }
-
Cree el archivo
custom-UDF1.jar
de Java y, a continuación, cárguelo a una ubicación propia en Amazon S3 (por ejemplo,s3://
).DOC-EXAMPLE-BUCKET
/custom-UDF1.jar -
En su cuaderno de Athena para Spark, ejecute los siguientes comandos. Asegúrese de utilizar el esquema
s3a://
paraadd jar
.spark.sql('add jar s3a://
DOC-EXAMPLE-BUCKET
/custom-UDF1.jar') spark.sql('create temporary function reverse as "pkg.customUDF1.UDFClass1"') -
Cree y muestre una tabla sencilla que pueda utilizar para probar la función.
spark.createDataFrame(["athena", "spark"], "String").write.saveAsTable("simple_table") spark.sql('select * from simple_table').show() >> +------+ | value| +------+ |athena| | spark| +------+
-
Ejecute la UDF en la tabla y muestre el resultado.
spark.sql('select reverse(value) from simple_table').show() >> +--------------+ |reverse(value)| +--------------+ | anehta| | kraps| +--------------+