PutEntrustDataBiz.java 5.87 KB
package com.cjs.cms.biz.elasticsearch;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Builder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.cjs.cms.dao.report.EsPutEntrustDataDao;
import com.cjs.cms.dao.report.OtcRealTimeDao;
import com.cjs.cms.model.report.EsPutEntrustDataInfo;
import com.cjs.cms.model.report.TradeRecordInfo;
import com.cjs.cms.util.elasticsearch.EsClient;
import com.cjs.cms.util.lang.DateEnum;
import com.cjs.cms.util.lang.DateUtil;
import com.cjs.cms.util.lang.JsonUtil;
import com.cjs.cms.util.lang.PageUtils;

/**
 * 新增每日成交数据
 * 
 * @author tongyufu
 *
 */
@Service
public class PutEntrustDataBiz {

    Logger                      log            = LogManager.getLogger();
    @Autowired
    private OtcRealTimeDao      otcRealTimeDao;
    @Autowired
    private EsPutEntrustDataDao esPutEntrustDataDao;
    static Integer              _successAmount = 0;
    static Integer              _failAmount    = 0;

    public void putData() {
        Integer putDate = Integer.valueOf(DateUtil.getNow(DateEnum.UNSIGNED_DATE));
        Integer lastPutDate = esPutEntrustDataDao.queryLatPutDate();

        log.info("开始向ES添加成交记录:" + lastPutDate);
        if (lastPutDate == putDate) {
            log.info("已执行添加成交记录操作:" + putDate);
            return;
        }
        if (lastPutDate == null) {
            lastPutDate = 20150101;
        }
        this.putEntrustData(lastPutDate);
    }

    /**
     * 新增数据
     * @param putDate 大于等于此日期的数据都会被新增到ES
     */
    public void putEntrustData(Integer putDate) {
        _successAmount = 0;
        _failAmount = 0;
        TransportClient client = EsClient.getClient();
        try {
            Builder build = BulkProcessor.builder(client, new BulkProcessor.Listener() {

                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    log.error("执行出错:" + failure.getMessage());
                    _failAmount++;
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    _successAmount++;
                }
            });
            //每次处理数量,默认1000
            build.setBulkActions(1000);
            //每10MB flush一次,默认5MB
            build.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB));
            //不管有多少请求,5秒flush一次
            build.setFlushInterval(TimeValue.timeValueSeconds(5));
            //并发请求数量,0表示单线程
            build.setConcurrentRequests(1);
            //指定100毫秒后开始获取返回信息,最多重试3次,如果出错会抛出EsRejectedExecutionException,这表明没有足够的计算机资源来处理请求。
            //禁用重试:BackoffPolicy.noBackoff()
            build.setBackoffPolicy(
                BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));

            BulkProcessor bulk = build.build();
            EsPutEntrustDataInfo putInfo = esPutEntrustDataDao.queryByPutDate(putDate);
            Map<String, Object> params = new HashMap<String, Object>();
            params.put("startDate", putDate);
            int total = otcRealTimeDao.queryTradeTotal(params);
            int pageSize = 1000;
            int pageCount = new BigDecimal((double) total / pageSize)
                .setScale(0, RoundingMode.CEILING).intValue();
            int lastId = putInfo == null ? 0 : putInfo.getLastId();

            if (total == 0) {
                return;
            }
            params.put("rows", pageSize);
            for (int i = 0; i < pageCount; i++) {
                params.put("page", i + 1);
                PageUtils.processOralcePage(params);
                List<TradeRecordInfo> trades = otcRealTimeDao.queryTrade(params);
                for (TradeRecordInfo trade : trades) {
                    lastId++;
                    trade.setId(lastId);
                    bulk.add(new IndexRequest(EsClient.INDEX, EsClient.TYPE)
                        .source(JsonUtil.toJson(trade)));
                }
                TimeUnit.SECONDS.sleep(2);
            }
            //指定时间后关闭连接。如果在指定时间前所有请求处理完毕,返回true,反之返回false。
            //close()方法则不等待任何请求完成而立刻退出
            bulk.awaitClose(90, TimeUnit.MINUTES);

            putInfo = new EsPutEntrustDataInfo();
            putInfo.setPutDate(Integer.valueOf(DateUtil.getNow(DateEnum.UNSIGNED_DATE)));
            putInfo.setSuccessAmount(_successAmount);
            putInfo.setFailAmount(_failAmount);
            putInfo.setLastId(lastId);
            esPutEntrustDataDao.save(putInfo);
            log.info("=====添加到ES记录数:" + total);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}