activiti高并发之id生成器
默认的id生成策略
默认采用的是org.activiti.engine.impl.db.DbIdGenerator
public class DbIdGenerator implements IdGenerator {
protected int idBlockSize;
protected long nextId = 0;
protected long lastId = -1;
protected CommandExecutor commandExecutor;
protected CommandConfig commandConfig;
public synchronized String getNextId() {
if (lastId
每次从数据库中取出一段
public class GetNextIdBlockCmd implements Command {
private static final long serialVersionUID = 1L;
protected int idBlockSize;
public GetNextIdBlockCmd(int idBlockSize) {
this.idBlockSize = idBlockSize;
}
public IdBlock execute(CommandContext commandContext) {
PropertyEntity property = (PropertyEntity) commandContext
.getPropertyEntityManager()
.findPropertyById("next.dbid");
long oldValue = Long.parseLong(property.getValue());
long newValue = oldValue+idBlockSize;
property.setValue(Long.toString(newValue));
return new IdBlock(oldValue, newValue-1);
}
}
在CommandContextInterceptor里头,拦截了命令
public class CommandContextInterceptor extends AbstractCommandInterceptor {
private static final Logger log = LoggerFactory.getLogger(CommandContextInterceptor.class);
protected CommandContextFactory commandContextFactory;
protected ProcessEngineConfigurationImpl processEngineConfiguration;
public CommandContextInterceptor() {
}
public CommandContextInterceptor(CommandContextFactory commandContextFactory, ProcessEngineConfigurationImpl processEngineConfiguration) {
this.commandContextFactory = commandContextFactory;
this.processEngineConfiguration = processEngineConfiguration;
}
public T execute(CommandConfig config, Command command) {
CommandContext context = Context.getCommandContext();
boolean contextReused = false;// We need to check the exception, because the transaction can be in a rollback state,// and some other command is being fired to compensate (eg. decrementing job retries)if (!config.isContextReusePossible() || context == null || context.getException() != null) { context = commandContextFactory.createCommandContext(command); } else { log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName()); contextReused = true;}try { // Push on stack Context.setCommandContext(context); Context.setProcessEngineConfiguration(processEngineConfiguration); return next.execute(config, command);} catch (Exception e) { context.exception(e);} finally { try { if (!contextReused) { context.close(); } } finally { // Pop from stack Context.removeCommandContext(); Context.removeProcessEngineConfiguration(); Context.removeBpmnOverrideContext(); }}return null;
}
public CommandContextFactory getCommandContextFactory() {
return commandContextFactory;
}
public void setCommandContextFactory(CommandContextFactory commandContextFactory) {
this.commandContextFactory = commandContextFactory;
}
public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {
return processEngineConfiguration;
}
public void setProcessEngineContext(ProcessEngineConfigurationImpl processEngineContext) {
this.processEngineConfiguration = processEngineContext;
}
里头的finally里头,有个context.close方法
public void close() {
// the intention of this method is that all resources are closed properly, even
// if exceptions occur in close or flush methods of the sessions or the
// transaction context.
try { try { try { if (exception == null && closeListeners != null) { try { for (CommandContextCloseListener listener : closeListeners) { listener.closing(this); } } catch (Throwable exception) { exception(exception); } } if (exception == null) { flushSessions(); } } catch (Throwable exception) { exception(exception); } finally { try { if (exception == null) { transactionContext.commit(); } } catch (Throwable exception) { exception(exception); } if (exception == null && closeListeners != null) { try { for (CommandContextCloseListener listener : closeListeners) { listener.closed(this); } } catch (Throwable exception) { exception(exception); } } if (exception != null) { if (exception instanceof JobNotFoundException || exception instanceof ActivitiTaskAlreadyClaimedException) { // reduce log level, because this may have been caused because of job deletion due to cancelActiviti="true" log.info("Error while closing command context", exception); } else if (exception instanceof ActivitiOptimisticLockingException) { // reduce log level, as normally we're not interested in logging this exception log.debug("Optimistic locking exception : " + exception); } else { log.debug("Error while closing command context", exception); } transactionContext.rollback(); } } } catch (Throwable exception) { exception(exception); } finally { closeSessions(); }} catch (Throwable exception) { exception(exception);} // rethrow the original exception if there was oneif (exception != null) { if (exception instanceof Error) { throw (Error) exception; } else if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else { throw new ActivitiException("exception while executing command " + command, exception); }}
}
该方法会去flushSessions
public void flush() {
List removedOperations = removeUnnecessaryOperations();
flushDeserializedObjects();List updatedObjects = getUpdatedObjects();if (log.isDebugEnabled()) { Collection> insertedObjectLists = insertedObjects.values(); int nrOfInserts = 0, nrOfUpdates = 0, nrOfDeletes = 0; for (List insertedObjectList: insertedObjectLists) { for (PersistentObject insertedObject : insertedObjectList) { log.debug(" insert {}", insertedObject); nrOfInserts++; } } for (PersistentObject updatedObject: updatedObjects) { log.debug(" update {}", updatedObject); nrOfUpdates++; } for (DeleteOperation deleteOperation: deleteOperations) { log.debug(" {}", deleteOperation); nrOfDeletes++; } log.debug("flush summary: {} insert, {} update, {} delete.", nrOfInserts, nrOfUpdates, nrOfDeletes); log.debug("now executing flush...");}flushInserts();flushUpdates(updatedObjects);flushDeletes(removedOperations);
}
会去flushUpdates。
因此如果在高并发的场景下,可能一个线程读取一段block之后,还没有来得及update,已经被另一线程读取,造成id已经被占用的情况。为解决高并发的问题,可以采用uuid策略。
uuid策略
org.activiti.engine.impl.persistence.StrongUuidGenerator
public class StrongUuidGenerator implements IdGenerator {
// different ProcessEngines on the same classloader share one generator.
protected static TimeBasedGenerator timeBasedGenerator;
public StrongUuidGenerator() {
ensureGeneratorInitialized();
}
protected void ensureGeneratorInitialized() {
if (timeBasedGenerator == null) {
synchronized (StrongUuidGenerator.class) {
if (timeBasedGenerator == null) {
timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface());
}
}
}
}
public String getNextId() {
return timeBasedGenerator.generate().toString();
}
}
采用的是com.fasterxml.uuid.impl.TimeBasedGenerator
参考
- UUID id generator for high concurrency
关键字:activiti
版权声明
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!