activiti高并发之id生成器

默认的id生成策略

默认采用的是org.activiti.engine.impl.db.DbIdGenerator

public class DbIdGenerator implements IdGenerator {

protected int idBlockSize;
protected long nextId = 0;
protected long lastId = -1;

protected CommandExecutor commandExecutor;
protected CommandConfig commandConfig;

public synchronized String getNextId() {
if (lastId
每次从数据库中取出一段

public class GetNextIdBlockCmd implements Command {

private static final long serialVersionUID = 1L;
protected int idBlockSize;

public GetNextIdBlockCmd(int idBlockSize) {
this.idBlockSize = idBlockSize;
}

public IdBlock execute(CommandContext commandContext) {
PropertyEntity property = (PropertyEntity) commandContext
.getPropertyEntityManager()
.findPropertyById("next.dbid");
long oldValue = Long.parseLong(property.getValue());
long newValue = oldValue+idBlockSize;
property.setValue(Long.toString(newValue));
return new IdBlock(oldValue, newValue-1);
}
}
在CommandContextInterceptor里头,拦截了命令

public class CommandContextInterceptor extends AbstractCommandInterceptor {
private static final Logger log = LoggerFactory.getLogger(CommandContextInterceptor.class);

protected CommandContextFactory commandContextFactory;
protected ProcessEngineConfigurationImpl processEngineConfiguration;

public CommandContextInterceptor() {
}

public CommandContextInterceptor(CommandContextFactory commandContextFactory, ProcessEngineConfigurationImpl processEngineConfiguration) {
this.commandContextFactory = commandContextFactory;
this.processEngineConfiguration = processEngineConfiguration;
}

public T execute(CommandConfig config, Command command) {
CommandContext context = Context.getCommandContext();

boolean contextReused = false;// We need to check the exception, because the transaction can be in a rollback state,// and some other command is being fired to compensate (eg. decrementing job retries)if (!config.isContextReusePossible() || context == null || context.getException() != null) {     context = commandContextFactory.createCommandContext(command);        }  else {    log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());    contextReused = true;}try {  // Push on stack  Context.setCommandContext(context);  Context.setProcessEngineConfiguration(processEngineConfiguration);  return next.execute(config, command);} catch (Exception e) {  context.exception(e);} finally {  try {      if (!contextReused) {          context.close();      }  } finally {      // Pop from stack      Context.removeCommandContext();      Context.removeProcessEngineConfiguration();      Context.removeBpmnOverrideContext();  }}return null;

}

public CommandContextFactory getCommandContextFactory() {
return commandContextFactory;
}

public void setCommandContextFactory(CommandContextFactory commandContextFactory) {
this.commandContextFactory = commandContextFactory;
}

public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {
return processEngineConfiguration;
}

public void setProcessEngineContext(ProcessEngineConfigurationImpl processEngineContext) {
this.processEngineConfiguration = processEngineContext;
}
里头的finally里头,有个context.close方法

public void close() {
// the intention of this method is that all resources are closed properly, even
// if exceptions occur in close or flush methods of the sessions or the
// transaction context.

try {  try {    try {        if (exception == null && closeListeners != null) {            try {                for (CommandContextCloseListener listener : closeListeners) {                    listener.closing(this);                }            } catch (Throwable exception) {                exception(exception);            }        }      if (exception == null) {        flushSessions();      }    } catch (Throwable exception) {      exception(exception);    } finally {      try {        if (exception == null) {          transactionContext.commit();        }      } catch (Throwable exception) {        exception(exception);      }        if (exception == null && closeListeners != null) {            try {                for (CommandContextCloseListener listener : closeListeners) {                    listener.closed(this);                }            } catch (Throwable exception) {                exception(exception);            }        }      if (exception != null) {        if (exception instanceof JobNotFoundException || exception instanceof ActivitiTaskAlreadyClaimedException) {          // reduce log level, because this may have been caused because of job deletion due to cancelActiviti="true"          log.info("Error while closing command context", exception);        } else if (exception instanceof ActivitiOptimisticLockingException) {          // reduce log level, as normally we're not interested in logging this exception          log.debug("Optimistic locking exception : " + exception);        } else {          log.debug("Error while closing command context", exception);        }        transactionContext.rollback();      }    }  } catch (Throwable exception) {    exception(exception);  } finally {    closeSessions();  }} catch (Throwable exception) {  exception(exception);} // rethrow the original exception if there was oneif (exception != null) {  if (exception instanceof Error) {    throw (Error) exception;  } else if (exception instanceof RuntimeException) {    throw (RuntimeException) exception;  } else {    throw new ActivitiException("exception while executing command " + command, exception);  }}

}
该方法会去flushSessions

public void flush() {
List removedOperations = removeUnnecessaryOperations();

flushDeserializedObjects();List updatedObjects = getUpdatedObjects();if (log.isDebugEnabled()) {  Collection> insertedObjectLists = insertedObjects.values();  int nrOfInserts = 0, nrOfUpdates = 0, nrOfDeletes = 0;  for (List insertedObjectList: insertedObjectLists) {      for (PersistentObject insertedObject : insertedObjectList) {          log.debug("  insert {}", insertedObject);          nrOfInserts++;      }  }  for (PersistentObject updatedObject: updatedObjects) {    log.debug("  update {}", updatedObject);    nrOfUpdates++;  }  for (DeleteOperation deleteOperation: deleteOperations) {    log.debug("  {}", deleteOperation);    nrOfDeletes++;  }  log.debug("flush summary: {} insert, {} update, {} delete.", nrOfInserts, nrOfUpdates, nrOfDeletes);  log.debug("now executing flush...");}flushInserts();flushUpdates(updatedObjects);flushDeletes(removedOperations);

}
会去flushUpdates。
因此如果在高并发的场景下,可能一个线程读取一段block之后,还没有来得及update,已经被另一线程读取,造成id已经被占用的情况。为解决高并发的问题,可以采用uuid策略。

uuid策略

org.activiti.engine.impl.persistence.StrongUuidGenerator

public class StrongUuidGenerator implements IdGenerator {

// different ProcessEngines on the same classloader share one generator.
protected static TimeBasedGenerator timeBasedGenerator;

public StrongUuidGenerator() {
ensureGeneratorInitialized();
}

protected void ensureGeneratorInitialized() {
if (timeBasedGenerator == null) {
synchronized (StrongUuidGenerator.class) {
if (timeBasedGenerator == null) {
timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface());
}
}
}
}

public String getNextId() {
return timeBasedGenerator.generate().toString();
}

}
采用的是com.fasterxml.uuid.impl.TimeBasedGenerator

参考

  1. UUID id generator for high concurrency

关键字:activiti

版权声明

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部