软件编程
位置:首页>> 软件编程>> java编程>> spring异步service中处理线程数限制详解

spring异步service中处理线程数限制详解

作者:soft_xiang  发布时间:2021-09-02 23:20:12 

标签:spring,异步,service

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:


@Service
public class LgtsAsyncServiceImpl {
/** logger日志. */
public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class);

private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻译的队列
private final AtomicInteger threadCnt = new AtomicInteger(0);// 当前翻译中的线程数
private final Vector<String> existsKey = new Vector<>();// 保存已入队列的数据
private final int maxThreadCnt = 2;// 允许同时执行的翻译线程数
private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻译条数
private static final String translationFrom = "zh";

@Async
public void saveAsync(Lgts t) {
 if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
  return;
 }
 offer(t);
 save();
 return;
}

private boolean offer(Lgts t) {
 String key = t.getGco() + "-" + t.getCode();
 if (!existsKey.contains(key)) {
  existsKey.add(key);
  boolean result = que.offer(t);
  // LOGGER.trace("待翻译文字[" + t.getGco() + ":" + t.getCode() + "]加入队列结果[" + result
  // + "],队列中数据总个数:" + que.size());
  return result;
 }
 return false;
}

@Autowired
private LgtsService lgtsService;

private void save() {
 int cnt = threadCnt.incrementAndGet();// 当前线程数+1
 if (cnt > maxThreadCnt) {
  // 已启动的线程大于设置的最大线程数直接丢弃
  threadCnt.decrementAndGet();// +1的线程数再-回去
  return;
 }
 GwallUser user = UserUtils.getUser();
 Thread thr = new Thread() {
  public void run() {
   long sleepTime = 30000l;
   UserUtils.setUser(user);
   boolean continueFlag = true;
   int maxContinueCnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
   int continueCnt = 0;// 连续休眠次数

while (continueFlag) {// 队列不为空时执行
    if (Objects.isNull(que.peek())) {
     try {
      if (continueCnt > maxContinueCnt) {
       // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。
       continueFlag = false;
       continue;
      }
      // 队列为空,准备休眠
      Thread.sleep(sleepTime);
      continueCnt++;
      continue;
     } catch (InterruptedException e) {
      // 休眠失败,无需处理
      e.printStackTrace();
     }
    }
    continueCnt = 0;// 重置连续休眠次数为0

List<Lgts> params = new ArrayList<>();
    int totalCnt = que.size();
    que.drainTo(params, NUM_OF_EVERY_TIME);
    StringBuilder utf8q = new StringBuilder();
    String code = "";
    List<Lgts> needRemove = new ArrayList<>();
    for (Lgts lgts : params) {
     if (StringUtils.isAnyBlank(code)) {
      code = lgts.getCode();
     }
     // 移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
     String key = lgts.getGco() + "-" + lgts.getCode();
     existsKey.remove(key);

if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻译的目标语言与当前列表中的第一个不一致
      offer(lgts);// 重新将待翻译的语言放回队列
      needRemove.add(lgts);
      continue;
     }
     utf8q.append(lgts.getGco()).append("\n");
    }
    params.removeAll(needRemove);
    LOGGER.debug("队列中共" + totalCnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);
    String to = "en";
    if (StringUtils.isAnyBlank(utf8q, to)) {
     LOGGER.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。");
     continue;
    }
    Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
    if (Objects.isNull(result) || result.isEmpty()) {// 把没有获取到翻译结果的重新放回队列
     for (Lgts lgts : params) {
      offer(lgts);
     }
     LOGGER.debug("本次翻译结果为空。");
     continue;
    }
    int sucessCnt = 0, ignoreCnt = 0;
    for (Lgts lgts : params) {
     lgts.setBdcode(to);
     String gna = result.get(lgts.getGco());
     if (StringUtils.isAnyBlank(gna)) {
      offer(lgts);// 重新将待翻译的语言放回队列
      continue;
     }
     lgts.setStat(1);
     lgts.setGna(gna);
     int saveResult = lgtsService.saveIgnore(lgts);
     if (0 == saveResult) {
      ignoreCnt++;
     } else {
      sucessCnt++;
     }
    }
    LOGGER.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucessCnt + ",已存在并忽略个数:" + ignoreCnt);
   }
   threadCnt.decrementAndGet();// 运行中的线程数-1
   distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改
  }

/**
   * 如果是最后一个线程,清空队列和existsKey中的数据
   */
  private void distory() {
   if (0 == threadCnt.get()) {
    // 最后一个线程退出时,执行清理操作
    existsKey.clear();
    que.clear();
   }
  }
 };
 thr.setDaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁
 thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));
 thr.start();// 启动插入线程
}

/**
 * 百度翻译
 *
 * @param utf8q
 *   待翻译的字符串,需要utf8格式的
 * @param from
 *   百度翻译语言列表中的代码
 *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
 * @param to
 *   百度翻译语言列表中的代码
 *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
 * @return 翻译结果
 */
private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
 Map<String, String> result = new HashMap<>();
 String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
 if (StringUtils.isAnyBlank(baiduurlStr)) {
  LOGGER.warn("百度翻译API接口URL相关参数为空!");
  return result;
 }
 Map<String, String> params = buildParams(utf8q, from, to);
 if (params.isEmpty()) {
  return result;
 }

String sendUrl = getUrlWithQueryString(baiduurlStr, params);
 try {
  HttpClient httpClient = new HttpClient();
  httpClient.setMethod("GET");
  String remoteResult = httpClient.pub(sendUrl, "");
  result = convertRemote(remoteResult);
 } catch (Exception e) {
  LOGGER.info("百度翻译API返回结果异常!", e);
 }
 return result;
}

private Map<String, String> convertRemote(String remoteResult) {
 Map<String, String> result = new HashMap<>();
 if (StringUtils.isBlank(remoteResult)) {
  return result;
 }
 JSONObject jsonObject = JSONObject.parseObject(remoteResult);
 JSONArray trans_result = jsonObject.getJSONArray("trans_result");
 if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
  return result;
 }
 for (Object object : trans_result) {
  JSONObject trans = (JSONObject) object;
  result.put(trans.getString("src"), trans.getString("dst"));
 }
 return result;
}

private Map<String, String> buildParams(String utf8q, String from, String to) {
 if (StringUtils.isBlank(from)) {
  from = "auto";
 }
 Map<String, String> params = new HashMap<String, String>();
 String skStr = "sk";
 String appidStr = "appid";
 if (StringUtils.isAnyBlank(skStr, appidStr)) {
  LOGGER.warn("百度翻译API接口相关参数为空!");
  return params;
 }

params.put("q", utf8q);
 params.put("from", from);
 params.put("to", to);

params.put("appid", appidStr);

// 随机数
 String salt = String.valueOf(System.currentTimeMillis());
 params.put("salt", salt);

// 签名
 String src = appidStr + utf8q + salt + skStr; // 加密前的原文
 params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());
 return params;
}

public static String getUrlWithQueryString(String url, Map<String, String> params) {
 if (params == null) {
  return url;
 }

StringBuilder builder = new StringBuilder(url);
 if (url.contains("?")) {
  builder.append("&");
 } else {
  builder.append("?");
 }

int i = 0;
 for (String key : params.keySet()) {
  String value = params.get(key);
  if (value == null) { // 过滤空的key
   continue;
  }

if (i != 0) {
   builder.append('&');
  }

builder.append(key);
  builder.append('=');
  builder.append(encode(value));

i++;
 }

return builder.toString();
}

/**
 * 对输入的字符串进行URL编码, 即转换为%20这种形式
 *
 * @param input
 *   原文
 * @return URL编码. 如果编码失败, 则返回原文
 */
public static String encode(String input) {
 if (input == null) {
  return "";
 }

try {
  return URLEncoder.encode(input, "utf-8");
 } catch (UnsupportedEncodingException e) {
  e.printStackTrace();
 }

return input;
}
}

来源:https://segmentfault.com/a/1190000020299948

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com