聊聊puma的ClientPositionService

编程

本文主要研究一下puma的ClientPositionService

ClientPositionService

puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java

public interface ClientPositionService {

List<ClientPositionEntity> findAll();

ClientPositionEntity find(String clientName);

void update(ClientPositionEntity clientPositionEntity, boolean flush);

void flush();

void cleanUpTestClients();

}

  • ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法

ClientPositionServiceImpl

puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java

@Service

public class ClientPositionServiceImpl implements ClientPositionService {

private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);

@Autowired

private ClientPositionDao clientPositionDao;

private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();

@Override

public List<ClientPositionEntity> findAll() {

return clientPositionDao.findAll();

}

@Override

public ClientPositionEntity find(String clientName) {

return clientPositionDao.findByClientName(clientName);

}

@Override

public void update(ClientPositionEntity clientPositionEntity, boolean flush) {

if (flush) {

positionEntityMap.remove(clientPositionEntity.getClientName());

insertOrUpdate(clientPositionEntity);

} else {

positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);

}

}

@Scheduled(fixedDelay = 5000)

public void flush() {

Set<String> keys = positionEntityMap.keySet();

for (String key : keys) {

ClientPositionEntity entity = positionEntityMap.remove(key);

if (entity == null) {

continue;

}

insertOrUpdate(entity);

}

}

private void insertOrUpdate(ClientPositionEntity entity) {

try {

entity.setUpdateTime(new Date());

int updateRow = clientPositionDao.update(entity);

if (updateRow == 0) {

clientPositionDao.insert(entity);

}

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

public void cleanUpTestClients() {

List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();

for (ClientPositionEntity entity : clients) {

clientPositionDao.delete(entity.getId());

}

}

}

  • ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)

ClientPositionDao

puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java

public interface ClientPositionDao {

List<ClientPositionEntity> findAll();

ClientPositionEntity findByClientName(String clientName);

int update(ClientPositionEntity entity);

int insert(ClientPositionEntity entity);

int delete(int id);

List<ClientPositionEntity> findOldTestClient();

}

  • ClientPositionDao定义了findAll、findByClientName、update、insert、delete、findOldTestClient方法

ClientPositionEntity

puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java

public class ClientPositionEntity extends BaseEntity {

private String clientName;

private String binlogFile;

private long binlogPosition;

private long serverId;

private int eventIndex;

private long timestamp;

//......

}

  • ClientPositionEntity继承了BaseEntity,定义了clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp属性

ClientPositionMapper

puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml

<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"

"http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">

<select id="findAll" resultType="ClientPositionEntity">

SELECT * FROM ClientPosition

</select>

<select id="findByClientName" resultType="ClientPositionEntity">

SELECT * FROM ClientPosition where ClientName = #{clientName}

</select>

<update id="update" parameterType="ClientPositionEntity">

update ClientPosition

set

BinlogFile = #{binlogFile},

BinlogPosition = #{binlogPosition},

ServerId = #{serverId},

EventIndex = #{eventIndex},

Timestamp = #{timestamp},

UpdateTime = #{updateTime}

where

ClientName = #{clientName}

</update>

<insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">

insert into ClientPosition

(

ClientName,

BinlogFile,

BinlogPosition,

ServerId,

EventIndex,

Timestamp,

UpdateTime

)

values

(

#{clientName},

#{binlogFile},

#{binlogPosition},

#{serverId},

#{eventIndex},

#{timestamp},

#{updateTime}

)

</insert>

<select id="findOldTestClient" resultType="ClientPositionEntity">

select * from ClientPosition

where UpdateTime &lt; NOW() - INTERVAL 10 DAY

and ClientName like "%test"

</select>

<delete id="delete" parameterType="int">

delete from ClientPosition where id = #{id}

</delete>

</mapper>

  • ClientPositionMapper实现了ClientPositionDao定义的方法

小结

ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)

doc

  • ClientPositionService

以上是 聊聊puma的ClientPositionService 的全部内容, 来源链接: utcz.com/z/517235.html

回到顶部