RisingWave 用户定义函数 (一) :概览

|作者:王润基 RisingWave Labs 内核开发工程师

用户定义函数(User Defined Function,以下简称 UDF)是数据系统中的常见功能。它允许用户使用各种语言定义自己的函数,作为内置函数的补充,以实现各种定制化执行逻辑。通过 UDF,我们可以将多个已有函数组合起来形成新函数,简化查询逻辑;也可以使用 Python 等语言编写代码,借用其它语言的生态,填补 SQL 语言和内置函数表达能力的不足;除了纯计算以外,我们还可以调用外部系统 API,将外部服务集成到统一的数据处理管线中。可以说,UDF 的存在极大提升了数据处理系统的灵活性和扩展性。

在之前的文章中,我们介绍了 RisingWave 内部函数的开发框架,它主要面向内部开发者。接下来,我们将通过一系列文章介绍面向用户的 UDF 功能的设计与实现过程。它在技术上与前者一脉相承,但对易用性提出了更高的要求。作为该系列第一篇文章,我们先来概览一下 UDF 的种类、用户接口和使用场景。

UDF 的种类

UDF 可以从三个维度进行分类。

第一个维度是函数的输入输出。在 SQL 中,我们有以下几种常见的函数:

  • 标量函数(Scalar Function):输入一行,输出一行。例如 abs(-1) -> 1
  • 表值函数(Table Function):输入一行,输出多行。例如 generate_series(1,3) -> {1,2,3}
  • 聚合函数(Aggregate Function):输入多行,输出一行。例如 sum({1,2,3}) -> 6 。
  • 窗口函数(Window Function):输入多行,输出多行。例如 lag({1,2,3}) -> {NULL,1,2}

这里每一种函数都可以支持用户自定义。在 UDF 的语境下,我们通常将它们简称为 UDF、UDTF、UDAF 和 UDWF。RisingWave 目前支持 UDF 和 UDTF,它们可以覆盖绝大多数实际需求。我们也正在开发 UDAF 以支持更多场景。

第二个维度是编写函数的语言。目前 RisingWave 已经支持的语言包括 SQL、Python、Java、JavaScript 和 Rust。它们面向的用户和场景也略有不同:

  • SQL 是 RisingWave 的原生语言。SQL UDF 并没有提供超出 SQL 本身的表达能力,它只是将已有的函数进行组合,实现代码复用和简化查询的效果。
  • Python 是数据科学和人工智能领域的常用语言。它编写简单、生态丰富、用户众多,因此是我们最早支持的 UDF 语言。但它作为解释执行的脚本语言,运行速度很慢,不适合实现重计算任务。
  • Java 是大数据系统的主流语言。包括 Hadoop、Spark、Flink、Kafka 等在内的 Apache 生态都使用 Java 编写。因此 Java UDF 主要面向从以上系统迁移而来的用户。Java 语言性能较高,但其编写和部署方式较为繁琐。
  • JavaScript 是 Web 前后端的主流语言。近年来其生态迅猛发展,有赶超 Python 的趋势。JavaScript UDF 主要面向前后端开发者。
  • Rust 是高性能系统编程语言,也是 RisingWave 本身的开发语言。Rust UDF 适合编写对性能要求高的重计算任务。它会被编译为 WebAssembly 在 RisingWave 内置的容器中运行。

总的来说,我们的目标是让用户能够用任何喜欢的语言和适合的语言来编写函数,不让语言成为表达逻辑的障碍。

第三个维度是函数的执行方式。这决定了函数的性能和能力。目前我们有以下三种执行方式:

  • 内联执行:特指 SQL UDF。它会在前端直接展开 inline 到表达式中。因此和手动调用多个函数相比,性能几乎没有区别。
  • 嵌入式执行:函数运行在 RisingWave 计算节点内嵌的语言 runtime 中。由于直接在进程内调用,它几乎没有额外通信开销。但出于安全考虑,我们目前不开放 runtime 的外部访问,也就是说这些函数只能进行纯计算,不能访问网络。目前 Python、JavaScript 和 Rust 支持嵌入式执行。
  • 外部函数(External Function):函数运行在一个独立的进程中,以 RPC 的方式向 RisingWave 提供服务。目前外部函数支持 Python 和 Java 语言。这种方式提供了最大的灵活性,用户可以在这个进程中做任何事情。而它与 RisingWave 本身天然隔离,使得 RisingWave 无须担心用户定义函数与其争抢资源或干扰运行。但这种方式最大的问题在于性能,因为 RPC 会引入很大的延迟,导致整个数据流的阻塞(尤其是当在云上和 RisingWave 部署在不同的数据中心时)。此外,它对用户的部署和运维也带来不少麻烦。

在 RisingWave 的开发过程中,我们首先实现的是 Python 语言的外部函数。这也是目前用户使用最多的一种 UDF。为了解决它带来的性能和易用性的问题,我们又开发了 SQL UDF 和嵌入式执行的 UDF。在之后的文章中,我们将深入它们的设计与实现,并探讨相关问题与解决方案。

RisingWave UDF 用户接口

为了让大家对以上五花八门的 UDF 有一个具体的概念,我们来看一下用户在 RisingWave 中如何定义 UDF。这里我们举几个经典的使用场景,更多详细用法可以查看 RisingWave 官方文档。

示例 1:使用 SQL UDF 简化查询

例如我们想将学生的百分制分数转换为成绩等级。这一逻辑可以使用 SQL 内置的 case when 语句实现,但是它表达起来比较繁琐,写在查询中可读性不佳。于是我们可以创建一个 SQL UDF 表达这一过程。在 RisingWave 中,使用 create function 语句即可定义一个函数:

create function grade(score int) returns varchar language sql as $$
    select case score >= 100 then 'A+'
        when score >= 90 then 'A'
        when score >= 80 then 'B'
        when score >= 70 then 'C'
        when score >= 60 then 'D'
        else 'F'
    end;
$$;

然后,我们就可以像内置函数一样在任何地方调用这个 UDF 了:

select score, grade(score) from generate_series(50, 100, 10) t(score);
 score | grade 
-------+-------
    50 | F
    60 | D
    70 | C
    80 | B
    90 | A
   100 | A+
(6 rows)

示例 2:使用 Rust UDF 进行数学计算

如果我们想实现一些内置函数不支持的数学运算,例如求最大公约数(GCD),可以使用 Rust UDF。和上面的 SQL UDF 类似,我们可以将 Rust 代码嵌入 create function 语句中。

create function gcd(int, int) returns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

这段代码会在 RisingWave 前端被编译到 WebAssembly,然后在计算节点上以 JIT 方式运行,以达到接近原生的性能。其它一些需要高性能定制计算逻辑的场景,例如量化因子提取,也适合使用 Rust UDF 实现。

示例 3:使用 Rust 第三方库解析 protobuf 数据

解析 Protobuf 也可以使用 Rust UDF。只是此时我们需要使用一些第三方库(例如 prost),先从 .proto 文件生成 Rust 代码,然后再对数据进行提取。整个过程需要创建一个完整的 cargo 项目,仅仅在语句中嵌入代码是无法完成这一需求的。

// lib.rs
use arrow_udf::{function, types::StructType};
use prost::{DecodeError, Message};

// 导入从 .proto 生成的 Rust 代码
pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}

// 定义返回结构体
#[derive(StructType)]
struct DataKey {
    stream: String,
    pan: String,
}

// 定义解析函数
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
    let data_key = proto::DataKey::decode(data)?;
    Ok(DataKey {
        stream: data_key.stream,
        pan: data_key.pan,
    })
}

对于这样的场景,我们允许用户自己创建 Rust 项目并编译成 WebAssembly 模块。然后通过 CREATE FUNCTION 语句将 WASM 模块以 BASE64 编码的方式直接导入 RisingWave 中。

\\set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';

具体操作方法可以参考 RisingWave 文档。

示例 4:使用 JavaScript UDTF 处理 JSON 数据

假设我们想获得 RisingWave 项目的贡献者名单。我们访问 Github REST API 获取到一个 JSON 列表,希望从中提取出想要的信息。于是我们用 JavaScript 定义一个 UDTF,输入 API 返回的 JSON,然后逐行输出开发者昵称和贡献数。

create function contributors(response jsonb)
returns table (name varchar, contributions int)
language javascript as $$
    for (let user of response) {
        yield { name: user.login, contributions: user.contributions };
    }
$$;
select * from contributors('<response>') limit 5;
      name       | contributions 
-----------------+---------------
 BugenZhao       |           616
 skyzh           |           482
 TennyZhuang     |           465
 xxchan          |           435
 kwannoel        |           410

示例 5:使用 Python 外部函数调用 LLM 服务

如果你想在流处理中调用时下最流行的大语言模型服务,可以使用 Python 定义外部函数。下面的脚本创建了一个 UDF Server,其中定义了一个函数,内部调用 OpenAI API 生成文本:

from risingwave.udf import udf, UdfServer
from openai import OpenAI

client = OpenAI()

@udf(input_types=["VARCHAR"], result_type="VARCHAR", io_threads=8)
def ask_ai(content):
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": content},
        ],
    )
    return response.choices[0].message.content

if __name__ == "__main__":
    server = UdfServer(location="localhost:8815")
    server.add_function(ask_ai)
    server.serve()

当 UDF 服务启动后,我们可以在 RisingWave 中创建函数,与 Python 服务器建立连接。

create function ask_ai(varchar) returns varchar
as ask_ai using link '<http://localhost:8815>';

总结

用户可以通过 CREATE FUNCTION 语句在 RisingWave 中创建自定义函数。函数需要指定参数和返回值类型,然后嵌入实现代码,或者指定外部函数的地址。用户可以根据实际使用场景,选择适合的语言实现 UDF。

使用场景适合的 UDF
组合函数,简化查询SQL
高性能计算Rust
数据格式转换Rust / JS / Python
调用外部服务Python 外部函数
从 Flink 等系统迁移Java 外部函数

实现方式

RisingWave UDF 使用 Apache Arrow 作为数据接口格式。这是因为 UDF 涉及跨语言、跨进程的数据交换,而 Arrow 已经成为这一领域的事实标准。使用 Arrow 一方面可以直接复用其现有生态,例如我们基于 Arrow Flight RPC、pyarrow 和 Arrow Java API 实现了 Python 和 Java 的外部函数框架;另一方面,我们实现的 UDF 开发框架也可以被其它基于 Arrow 的项目所使用。

最近,我们将所有 UDF 实现代码从 RisingWave 中抽离出来,作为一个独立项目 arrow-udf 发布到社区。很快,隔壁 Databend 就基于这一框架为他们的系统实现了 Python、JavaScript 和 WASM 的 UDF 支持。还有一位热心的社区开发者主动为 JavaScript UDF 添加了 Deno 后端支持。 目前,这一 UDF 生态已经初现雏形,相信借助社区的力量未来能够获得更好的发展。如果你恰好也在用 Rust 开发数据系统,不妨尝试集成一下 arrow-udf,无论是 UDF 还是内置函数都可以用哦。

RisingWave 各种语言 UDF 的模块结构

说完了接口,后面的实现工作其实就比较 trivial 了。对于嵌入式 UDF,包括 Rust (WebAssembly)、JavaScript 和 Python,我们分别嵌入了 wasmtime、quickjs / deno 和 CPython 作为语言运行时。而对于外部函数,我们使用 Arrow Flight RPC 将数据发送给远端 UDF 进程,UDF 进程则使用我们提供的 Python SDK 和 Java SDK 进行开发。无论何种形式,都会有一段胶水代码负责将 Arrow 中数据转换为语言的原生类型,最终调用到用户定义的函数中。本质上这是一个 FFI 的工作。

我们会在接下来的两篇文章中展开介绍 Python 外部函数 和 基于 WebAssembly 的 Rust UDF 的具体实现。敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/633306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【前端】使用 Canvas 实现贪吃蛇小游戏

使用 Canvas 实现贪吃蛇小游戏 在这篇博客中&#xff0c;我们将介绍如何使用 HTML5 Canvas 和 JavaScript 实现一个简单的贪吃蛇&#xff08;Snake&#xff09;小游戏。这个项目是一个基础的游戏开发练习&#xff0c;它可以帮助你理解如何在 Canvas 上绘图、如何处理用户输入以…

【九十三】【算法分析与设计】719. 找出第 K 小的数对距离,N 台电脑的最长时间,二分答案法

719. 找出第 K 小的数对距离 - 力扣&#xff08;LeetCode&#xff09; 数对 (a,b) 由整数 a 和 b 组成&#xff0c;其数对距离定义为 a 和 b 的绝对差值。 给你一个整数数组 nums 和一个整数 k &#xff0c;数对由 nums[i] 和 nums[j] 组成且满足 0 < i < j < nums.le…

校园网拨号上网环境下多开虚拟机,实现宿主机与虚拟机互通,并访问外部网络

校园网某些登录客户端只允许同一时间一台设备登录&#xff0c;因此必须使用NAT模式共享宿主机的真实IP&#xff0c;相当于访问外网时只使用宿主机IP&#xff0c;此方式通过虚拟网卡与物理网卡之间的数据转发实现访问外网及互通 经验证&#xff0c;将centos的物理地址与主机物理…

UMPNet: Universal Manipulation Policy Network for Articulated Objects

1. 摘要 UMPNet是一个基于图像的策略网络&#xff0c;能够推理用于操纵铰接物体的闭环动作序列。该策略支持6DoF动作表示和可变长度轨迹。 为处理多种类的物体&#xff0c;该策略从不同的铰接结构中学习&#xff0c;并泛化到未见过的物体或类别上。该策略是以自监督探索的方式…

利用Python队列生产者消费者模式构建高效爬虫

目录 一、引言 二、生产者消费者模式概述 三、Python中的队列实现 四、生产者消费者模式在爬虫中的应用 五、实例分析 生产者类&#xff08;Producer&#xff09; 消费者类&#xff08;Consumer&#xff09; 主程序 六、总结 一、引言 随着互联网的发展&#xff0c;信…

css使用clip-path裁剪出不规则图形并绑定点击事件

点击图片的红色区域触发事件 点击图片黑色不触发点击事件&#xff0c;代码演示效果如下&#xff1a; 代码演示效果 1.png&#xff08;尺寸 200*470&#xff09; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><…

2025第十届美陈展

展位又遭疯抢&#xff01;2025第十届美陈展释放“无界之美” 美是全球通用的语言&#xff0c;人类对美的追求始终如一&#xff0c;大众审美在经历了时代的变迁后开始趋同&#xff0c;东方文明深处的美学经济开始崛起。 在如今商业迈入存量阶段&#xff0c;以品牌为突破口打造…

抽象工厂模式(AbstractFactoryPattern)

文章目录 1.抽象工厂模式定义2.UML类图3.抽象工厂模式具体实现工厂模式实现单一产品族抽象工厂实现多产品族产品类工厂类使用 4.抽象工厂模式优缺点 1.抽象工厂模式定义 提供一个创建一系列相关或相互依赖对象的接口&#xff0c;而无需指定它们具体的类。 工厂方法模式是单一产…

JavaScript-运算符

算术运算符 返回结果为数字型的运算符 加法运算符 加法运算符&#xff08;&#xff09;是一个二元运算符&#xff0c;可以对两个数字型的操作数进行相加运算&#xff0c;返回值是两个操作数的和 减法运算符 减法运算符&#xff08;-&#xff09;是一个二元运算符&#xff0c;可…

banner2.0自定义轮播布局

说明&#xff1a;最近碰到一个需求&#xff0c;让新闻列表实现轮播图的效果&#xff0c;也就是轮播新闻&#xff0c;然后样式必须按照ui设计的样式来弄&#xff0c;之前传统的banner&#xff0c;都是只轮播图片&#xff0c;没想到&#xff0c;这次居然要轮播新闻&#xff0c; 网…

【深度学习】YOLOv8训练,交通灯目标检测

文章目录 一、数据处理二、环境三、训练 一、数据处理 import traceback import xml.etree.ElementTree as ET import os import shutil import random import cv2 import numpy as np from tqdm import tqdmdef convert_annotation_to_list(xml_filepath, size_width, size_he…

java+ vue.js+uniapp一款基于云计算技术的企业级生产管理系统,云MES源码 MES系统如何与ERP系统集成?

java vue.jsuniapp一款基于云计算技术的企业级生产管理系统&#xff0c;云MES源码&#xff0c;MES系统如何与ERP系统集成&#xff1f; MES系统&#xff08;制造执行系统&#xff09;与ERP系统&#xff08;企业资源规划系统&#xff09;的集成可以通过多种方式实现&#xff0c;这…

【git】开发提交规范(feat、fix、perf)

这段时间收到的需求很多&#xff0c;可能是临近两周一次的大版本灰度上线&#xff0c;这次产生了一个关于git的思考&#xff0c;就是各个版本之间怎么管理的问题&#xff0c;这里做出我自己的一些方法。 首先&#xff0c;既然已经明确了remote分支中的release分支为主分支&…

Java中transient关键字

transient介绍 在Java中&#xff0c;transient是一个关键字&#xff0c;用于声明一个字段在序列化过程中应该被忽略。当一个对象被序列化时&#xff0c;它的状态&#xff08;即其字段的值&#xff09;通常会被保存到字节流中&#xff0c;以便稍后可以反序列化恢复对象的状态。…

如何使用Matlab进行三角剖分(自定义函数实现delaunayTriangulation 使用Bowyer-Watson 算法)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、Delaunay三角形 二、使用步骤 1.Bowyer-Watson算法 2.算法步骤 三、动画演示 四、核心代码 五、对比matlab自带函数和我们的算法&#xff1a; 总结 前…

巨某量引擎后台登录实战笔记 | Playwright自动化框架

前言 本文章中所有内容仅供学习交流&#xff0c;抓包内容、敏感网址、数据接口均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff0c;若有侵权&#xff0c;请联系我立即删除&#xff01; 入正题看看滑块是怎么个事…

CasaOS系统玩客云安装内网穿透工具实现无公网IP远程访问

文章目录 前言1. CasaOS系统介绍2. 内网穿透安装3. 创建远程连接公网地址4. 创建固定公网地址远程访问 前言 2月底&#xff0c;玩客云APP正式停止运营&#xff0c;不再提供上传、云添加功能。3月初&#xff0c;有用户进行了测试&#xff0c;局域网内的各种服务还能继续使用&am…

Ai自动贴图直播项目的趋势,智享自动直播GMV增加工具

在当今社会&#xff0c;直播行业正在悄然地改变着人们的生活方式。无论是在闲暇时光中放松身心&#xff0c;还是在临睡前享受休闲娱乐&#xff0c;观众们越来越习惯于通过刷短视频或者观看直播来消遣自己。根据统计数据显示&#xff0c;到2023年全球将有超过10.74亿网民&#x…

Android 12系统源码_多窗口模式(二)系统实现分屏的功能原理

前言 上一篇我们具体分析了系统处于多窗口模式下&#xff0c;Android应用和多窗口模式相关方法的调用顺序&#xff0c;对于应用如何适配多窗口模式有了一个初步的认识&#xff0c;本篇文章我们将会结合Android12系统源码&#xff0c;具体来梳理一下系统是如何触发多窗口分屏模…

2024全新爆款好物推荐,618必买数码好物清单吐血整理!

​距离618购物狂欢节越来越近了&#xff0c;有很多日常价格不菲的产品在这次活动期间都会进行促销活动&#xff0c;尤其是数码类产品&#xff0c;加上618的优惠活动更有吸引力了。不过面对大促的热潮我们消费者在选购商品的同时还是要擦亮眼睛&#xff0c;避免买到质量不好的商…