
- 添加依赖
- 基于 Flink 服务提交任务并执行时需要的依赖包
- 构建ClickhouseSink参数实例
- 构建自定义ClickhouseStoreSink
基于 Flink 服务提交任务并执行时需要的依赖包ru.yandex.clickhouse clickhouse-jdbc0.2.4
基于 flink 服务器提交任务前,先上传依赖包到 flink 的 lib 目录下;然后重启 flink 服务,使 jar 进行加载;否则会出现 ClassNoFoundException 或者 clickhouseNoClassDefFound:ErrCould not initialize class ru.yandex.clickhouse.ClickHouseDriver 的异常。
- clickhouse-jdbc-0.2.4.jar
- httpclient-4.5.2.jar
- httpcore-4.4.4.jar
- commons-logging-1.0.4.jar
- guava-19.0.jar
public class ClickhouseSink implements Serializable {
private static final long serialVersionUID = -4410041701538783205L;
private String url;
private String index;
private String username;
private String password;
public String getUrl() {
return url;
}
public String getIndex() {
return index;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public ClickhouseSink(Object obj) {
final JSONObject json = JSONObject.parseObject(obj.toString());
this.url = json.getString("url");
this.index = json.getString("index");
this.username = json.getString("username");
this.password = json.getString("password");
}
}
构建自定义ClickhouseStoreSink
基于继承 RichSinkFunction< T > 抽象类实现自定义Sink,实现方法有三个:
- open():构建sink节点时最先执行的方法,用于实现一些初始化动作。
- invoke():执行节点时执行,用于实现具体业务逻辑。
- close():关闭节点回收资源时执行,用于资源的回收。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import java.util.Date; import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.ygsoft.dataprocess.vo.sink.ClickhouseSink; public class ClickhouseStoreSink extends RichSinkFunction
微信扫一扫
支付宝扫一扫
评论列表(0条)