分批调用函数,解决数据库SQL in超长问题
背景:在使用PostgreSQL进行查询时,我们经常会利用in
关键字来筛选多个条件。然而,当in
中包含的元素数量过多时,可能会导致SQL语句超出数据库所能处理的长度限制,从而引发错误。这种情况不仅影响查询的成功率,还可能导致系统性能下降,增加了开发和维护的复杂性。本文将探讨解决这一问题的有效方法,帮助开发者在实际应用中避免类似的困扰。
错误提示如下:
An I/O error occurred while sending to the backend.
、Tried to send an out-of-range integer as a 2-byte value: 39887
Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:336)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:446)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:370)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:149)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:138)
at kd.bos.ksql.shell.KDPreparedStatement.execute(KDPreparedStatement.java:244)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.execute(ProxyPreparedStatement.java:44)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.execute(HikariProxyPreparedStatement.java)
at kd.bos.util.jdbcproxy.PreparedStatementProxy.execute(PreparedStatementProxy.java:147)
at kd.bos.util.jdbcproxy.PreparedStatementProxy.execute(PreparedStatementProxy.java:147)
at kd.bos.trace.instrument.jdbc.PreparedStatementAOP.execute(PreparedStatementAOP.java:24)
at kd.bos.db.DBImpl.query(DBImpl.java:327)
... 71 more
Caused by: java.io.IOException: Tried to send an out-of-range integer as a 2-byte value: 39887
at org.postgresql.core.PGStream.sendInteger2(PGStream.java:252)
at org.postgresql.core.v3.QueryExecutorImpl.sendParse(QueryExecutorImpl.java:1470)
at org.postgresql.core.v3.QueryExecutorImpl.sendOneQuery(QueryExecutorImpl.java:1793)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1356)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:301)
错误的原因是:
PostgreSQL对每个语句支持最大的变量个数为32767。
在PGjdbc的GitHub上已有相关issue,但是还没有得到修复。目前可行的解决方案是,将查询分割成多个操作。
解决方案:通用函数
针对分割多个操作,在此基于Java8做了一个通用函数。可以实现分批执行函数,来解决该问题。也可以应用在并发的分批操作中,比如将一个大循环分批、并发执行以提升效率的场景。
以下是完整代码:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* 分批调用函数,解决数据库SQL in超长问题
*
* @author yuanjun_liu
* @version 1.0.0
* @since 2021/12/03
*/
public class BatchCallFunction<T, R> {
private static final int BATCH_NUMBER = 100;
/**
* 分批调用
*
* @param original 总ID数据
* @param function 处理函数
* @return 执行结果
*/
public List<R> call(T[] original, Function<? super T[], ? extends List<R>> function) {
// 初始化容量:16,解决20000 * 16的长度
List<R> result = new ArrayList<>(1 << 4);
int totalSize = original.length;
if (totalSize <= BATCH_NUMBER) {
return function.apply(original);
} else {
// 分批处理,每一批处理的数最多为{@link com.java.BatchCallFunction.BATCH_NUMBER}
int batchSize = totalSize / BATCH_NUMBER + 1;
for (int i = 0; i < batchSize; i++) {
int from = BATCH_NUMBER * i;
int to = BATCH_NUMBER * (i + 1);
if (to > totalSize) {
to = totalSize;
}
T[] subMaterials = Arrays.copyOfRange(original, from, to);
List<R> apply = function.apply(subMaterials);
result.addAll(apply);
}
}
return result;
}
public static void main(String[] args) {
AtomicInteger val = new AtomicInteger(0);
// define call function
Function<Integer[], List<Integer>> function = params -> {
// test implements method:
// 1、print first element
// 2、return result
System.out.println(params[0]);
List<Integer> result = new ArrayList<>();
result.add(val.getAndIncrement());
return result;
};
// 100005 seed
int total = BATCH_NUMBER * 5 + 5;
Integer[] seed = new Integer[total];
for (int i = 0; i < total; i++) {
seed[i] = i;
}
List<Integer> call = new BatchCallFunction<Integer, Integer>().call(seed, function);
System.out.println(call);
}
}