PutEntrustDataBiz.java
5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.cjs.cms.biz.elasticsearch;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Builder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.cjs.cms.dao.report.EsPutEntrustDataDao;
import com.cjs.cms.dao.report.OtcRealTimeDao;
import com.cjs.cms.model.report.EsPutEntrustDataInfo;
import com.cjs.cms.model.report.TradeRecordInfo;
import com.cjs.cms.util.elasticsearch.EsClient;
import com.cjs.cms.util.lang.DateEnum;
import com.cjs.cms.util.lang.DateUtil;
import com.cjs.cms.util.lang.JsonUtil;
import com.cjs.cms.util.lang.PageUtils;
/**
* 新增每日成交数据
*
* @author tongyufu
*
*/
@Service
public class PutEntrustDataBiz {
Logger log = LogManager.getLogger();
@Autowired
private OtcRealTimeDao otcRealTimeDao;
@Autowired
private EsPutEntrustDataDao esPutEntrustDataDao;
static Integer _successAmount = 0;
static Integer _failAmount = 0;
public void putData() {
Integer putDate = Integer.valueOf(DateUtil.getNow(DateEnum.UNSIGNED_DATE));
Integer lastPutDate = esPutEntrustDataDao.queryLatPutDate();
log.info("开始向ES添加成交记录:" + lastPutDate);
if (lastPutDate == putDate) {
log.info("已执行添加成交记录操作:" + putDate);
return;
}
if (lastPutDate == null) {
lastPutDate = 20150101;
}
this.putEntrustData(lastPutDate);
}
/**
* 新增数据
* @param putDate 大于等于此日期的数据都会被新增到ES
*/
public void putEntrustData(Integer putDate) {
_successAmount = 0;
_failAmount = 0;
TransportClient client = EsClient.getClient();
try {
Builder build = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("执行出错:" + failure.getMessage());
_failAmount++;
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
_successAmount++;
}
});
//每次处理数量,默认1000
build.setBulkActions(1000);
//每10MB flush一次,默认5MB
build.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB));
//不管有多少请求,5秒flush一次
build.setFlushInterval(TimeValue.timeValueSeconds(5));
//并发请求数量,0表示单线程
build.setConcurrentRequests(1);
//指定100毫秒后开始获取返回信息,最多重试3次,如果出错会抛出EsRejectedExecutionException,这表明没有足够的计算机资源来处理请求。
//禁用重试:BackoffPolicy.noBackoff()
build.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
BulkProcessor bulk = build.build();
EsPutEntrustDataInfo putInfo = esPutEntrustDataDao.queryByPutDate(putDate);
Map<String, Object> params = new HashMap<String, Object>();
params.put("startDate", putDate);
int total = otcRealTimeDao.queryTradeTotal(params);
int pageSize = 1000;
int pageCount = new BigDecimal((double) total / pageSize)
.setScale(0, RoundingMode.CEILING).intValue();
int lastId = putInfo == null ? 0 : putInfo.getLastId();
if (total == 0) {
return;
}
params.put("rows", pageSize);
for (int i = 0; i < pageCount; i++) {
params.put("page", i + 1);
PageUtils.processOralcePage(params);
List<TradeRecordInfo> trades = otcRealTimeDao.queryTrade(params);
for (TradeRecordInfo trade : trades) {
lastId++;
trade.setId(lastId);
bulk.add(new IndexRequest(EsClient.INDEX, EsClient.TYPE)
.source(JsonUtil.toJson(trade)));
}
TimeUnit.SECONDS.sleep(2);
}
//指定时间后关闭连接。如果在指定时间前所有请求处理完毕,返回true,反之返回false。
//close()方法则不等待任何请求完成而立刻退出
bulk.awaitClose(90, TimeUnit.MINUTES);
putInfo = new EsPutEntrustDataInfo();
putInfo.setPutDate(Integer.valueOf(DateUtil.getNow(DateEnum.UNSIGNED_DATE)));
putInfo.setSuccessAmount(_successAmount);
putInfo.setFailAmount(_failAmount);
putInfo.setLastId(lastId);
esPutEntrustDataDao.save(putInfo);
log.info("=====添加到ES记录数:" + total);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}