聊聊puma的ClientPositionService
序
本文主要研究一下puma的ClientPositionService
ClientPositionService
puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java
public interface ClientPositionService { List<ClientPositionEntity> findAll();
ClientPositionEntity find(String clientName);
void update(ClientPositionEntity clientPositionEntity, boolean flush);
void flush();
void cleanUpTestClients();
}
- ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法
ClientPositionServiceImpl
puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java
@Servicepublic class ClientPositionServiceImpl implements ClientPositionService {
private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);
@Autowired
private ClientPositionDao clientPositionDao;
private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();
@Override
public List<ClientPositionEntity> findAll() {
return clientPositionDao.findAll();
}
@Override
public ClientPositionEntity find(String clientName) {
return clientPositionDao.findByClientName(clientName);
}
@Override
public void update(ClientPositionEntity clientPositionEntity, boolean flush) {
if (flush) {
positionEntityMap.remove(clientPositionEntity.getClientName());
insertOrUpdate(clientPositionEntity);
} else {
positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);
}
}
@Scheduled(fixedDelay = 5000)
public void flush() {
Set<String> keys = positionEntityMap.keySet();
for (String key : keys) {
ClientPositionEntity entity = positionEntityMap.remove(key);
if (entity == null) {
continue;
}
insertOrUpdate(entity);
}
}
private void insertOrUpdate(ClientPositionEntity entity) {
try {
entity.setUpdateTime(new Date());
int updateRow = clientPositionDao.update(entity);
if (updateRow == 0) {
clientPositionDao.insert(entity);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void cleanUpTestClients() {
List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();
for (ClientPositionEntity entity : clients) {
clientPositionDao.delete(entity.getId());
}
}
}
- ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)
ClientPositionDao
puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java
public interface ClientPositionDao { List<ClientPositionEntity> findAll();
ClientPositionEntity findByClientName(String clientName);
int update(ClientPositionEntity entity);
int insert(ClientPositionEntity entity);
int delete(int id);
List<ClientPositionEntity> findOldTestClient();
}
- ClientPositionDao定义了findAll、findByClientName、update、insert、delete、findOldTestClient方法
ClientPositionEntity
puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java
public class ClientPositionEntity extends BaseEntity { private String clientName;
private String binlogFile;
private long binlogPosition;
private long serverId;
private int eventIndex;
private long timestamp;
//......
}
- ClientPositionEntity继承了BaseEntity,定义了clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp属性
ClientPositionMapper
puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">
<select id="findAll" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition
</select>
<select id="findByClientName" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition where ClientName = #{clientName}
</select>
<update id="update" parameterType="ClientPositionEntity">
update ClientPosition
set
BinlogFile = #{binlogFile},
BinlogPosition = #{binlogPosition},
ServerId = #{serverId},
EventIndex = #{eventIndex},
Timestamp = #{timestamp},
UpdateTime = #{updateTime}
where
ClientName = #{clientName}
</update>
<insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">
insert into ClientPosition
(
ClientName,
BinlogFile,
BinlogPosition,
ServerId,
EventIndex,
Timestamp,
UpdateTime
)
values
(
#{clientName},
#{binlogFile},
#{binlogPosition},
#{serverId},
#{eventIndex},
#{timestamp},
#{updateTime}
)
</insert>
<select id="findOldTestClient" resultType="ClientPositionEntity">
select * from ClientPosition
where UpdateTime < NOW() - INTERVAL 10 DAY
and ClientName like "%test"
</select>
<delete id="delete" parameterType="int">
delete from ClientPosition where id = #{id}
</delete>
</mapper>
- ClientPositionMapper实现了ClientPositionDao定义的方法
小结
ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)
doc
- ClientPositionService
以上是 聊聊puma的ClientPositionService 的全部内容, 来源链接: utcz.com/z/517235.html