準確的說,只要mybatis的的crud語句中包含了、等標籤或者${}之後,就已經算是動態sql了,所以只要在mybatis載入mapper文件期間被解析為非StaticSqlSource,就會被當做動態sql處理,在執行selectXXX或者update/insert/delete期間,就會調用對應的SqlNode介面和TextSqlNode.isDynamic()處理各自的標籤以及${},並最終將每個sql片段處理到StaticTextSqlNode並生成最終的參數化靜態SQL語句為止。所以,可以說,在絕大部分非PK查詢的情況下,我們都是在使用動態SQL。
如果MappedStatement.StatementType類型為CALLABLE,在Executor.doQuery方法中創建語句處理器的時候,就會返回CallableStatementHandler實例,隨後在調用語句處理器的初始化語句和設置參數 方法時,調用jdbc對應存儲過程的prepareCall方法,如下:
private void registerOutputParameters(CallableStatement cs) throws SQLException {
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
for (int i = 0, n = parameterMappings.size(); i < n; i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() == ParameterMode.OUT || parameterMapping.getMode() == ParameterMode.INOUT) {
if (null == parameterMapping.getJdbcType()) {
throw new ExecutorException("The JDBC Type must be specified for output parameter. Parameter: " + parameterMapping.getProperty());
} else {
if (parameterMapping.getNumericScale() != null && (parameterMapping.getJdbcType() == JdbcType.NUMERIC || parameterMapping.getJdbcType() == JdbcType.DECIMAL)) {
cs.registerOutParameter(i + 1, parameterMapping.getJdbcType().TYPE_CODE, parameterMapping.getNumericScale());
} else {
if (parameterMapping.getJdbcTypeName() == null) {
cs.registerOutParameter(i + 1, parameterMapping.getJdbcType().TYPE_CODE);
} else {
cs.registerOutParameter(i + 1, parameterMapping.getJdbcType().TYPE_CODE, parameterMapping.getJdbcTypeName());
}
}
}
}
}
}
mybatis的事務管理模式分為兩種,自動提交和手工提交,DefaultSqlSessionFactory的openSession中重載中,提供了一個參數用於控制是否自動提交事務,該參數最終被傳遞給 java.sql.Connection.setAutoCommit()方法用於控制是否自動提交事務(默認情況下,連接是自動提交的),如下所示:
如上所示,返回的事務傳遞給了執行器,因為執行器是在事務上下文中執行,所以對於自動提交模式,實際上mybatis不需要去關心。只有非自動管理模式,mybatis才需要關心事務。對於非自動提交模式,通過sqlSession.commit()或sqlSession.rollback()發起,在進行提交或者回滾的時候會調用isCommitOrRollbackRequired判斷是否應該提交或者回滾事務,如下所示:
上述邏輯執行完成後,會執行提交/回滾操作。對於緩存執行器,在提交/回滾完成之後,會將TransactionCache中的entriesMissedInCache和entriesToAddOnCommit列表分別移動到語句對應的二級緩存中或清空掉。
mybatis提供了基本實現org.apache.ibatis.cache.impl.PerpetualCache,內部採用原始HashMap實現。第二個需要知道的方面是mybatis有一級緩存和二級緩存。一級緩存是SqlSession級別的緩存,不同SqlSession之間的緩存數據區域(HashMap)是互相不影響,MyBatis默認支持一級緩存,不需要任何的配置,默認情況下(一級緩存的有效範圍可通過參數localCacheScope參數修改,取值為SESSION或者STATEMENT),在一個SqlSession的查詢期間,只要沒有發生commit/rollback或者調用close()方法,那麼mybatis就會先根據當前執行語句的CacheKey到一級緩存中查找,如果找到了就直接返回,不到資料庫中執行。其實現在代碼BaseExecutor.query()中,如下所示:
二級緩存是mapper級別的緩存,多個SqlSession去操作同一個mapper的sql語句,多個SqlSession可以共用二級緩存,二級緩存是跨SqlSession。二級緩存默認不啟用,需要通過在Mapper中明確設置cache,它的實現在CachingExecutor的query()方法中,如下所示:
在mybatis的緩存實現中,緩存鍵CacheKey的格式為:cacheKey=ID + offset + limit + sql + parameterValues + environmentId。對於本書例子中的語句,其CacheKey為:
對於模塊化微服務系統來說,應該來說mybatis的一二級緩存對業務數據都不適合,尤其是對於OLTP系統來說,CRM/BI這些不算,如果要求數據非常精確的話,也不是特別合適。對這些要求數據準確的系統來說,儘可能只使用mybatis的ORM特性比較靠譜。但是有一部分數據如果前期沒有很少的設計緩存的話,是很有價值的,比如說對於一些配置類數據比如數據字典、系統參數、業務配置項等很少變化的數據。
什麼是執行器?所有我們在應用層通過sqlSession執行的各類selectXXX和增刪改操作在做了動態sql和參數相關的封裝處理後,都被委託給具體的執行器去執行,包括一、二級緩存的管理,事務的具體管理,Statement和具體JDBC層面優化的實現等等。所以執行器比較像是sqlSession下的各個策略工廠實現,用戶通過配置決定使用哪個策略工廠。只不過執行器在一個mybatis配置下只有一個,這可能無法適應於所有的情況,尤其是哪些微服務做得不是特別好的中小型公司,因為這些系統通常混搭了OLTP和ETL功能。先來看下執行器介面的定義:
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
<E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
從上述可以看出,mybatis提供了兩種類型的執行器,緩存執行器與非緩存執行器(使用哪個執行器是通過配置文件中settings下的屬性defaultExecutorType控制的,默認是SIMPLE),是否使用緩存執行器則是通過執行cacheEnabled控制的,默認是true。 緩存執行器不是真正功能上獨立的執行器,而是非緩存執行器的裝飾器模式。 我們先來看非緩存執行器。非緩存執行器又分為三種,這三種類型的執行器都基於基礎執行器BaseExecutor,基礎執行器完成了大部分的公共功能,如下所示:package org.apache.ibatis.executor;
...
public abstract class BaseExecutor implements Executor {
protected Transaction transaction;
protected Executor wrapper;
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
// mybatis的二級緩存 PerpetualCache實際上內部使用的是常規的Map
protected PerpetualCache localCache;
// 用於存儲過程出參
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
protected int queryStack;
// transaction的底層連接是否已經釋放
private boolean closed;
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
@Override
public Transaction getTransaction() {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return transaction;
}
// 關閉本執行器相關的transaction
@Override
public void close(boolean forceRollback) {
try {
try {
rollback(forceRollback);
} finally {
if (transaction != null) {
transaction.close();
}
}
} catch (SQLException e) {
// Ignore. Theres nothing that can be done at this point.
log.warn("Unexpected exception on closing transaction. Cause: " + e);
} finally {
transaction = null;
deferredLoads = null;
localCache = null;
localOutputParameterCache = null;
closed = true;
}
}
@Override
public boolean isClosed() {
return closed;
}
// 更新操作
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
clearLocalCache();
return doUpdate(ms, parameter);
}
@Override
public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
}
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return doFlushStatements(isRollBack);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
return doQueryCursor(ms, parameter, rowBounds, boundSql);
}
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);
if (deferredLoad.canLoad()) {
deferredLoad.load();
} else {
deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));
}
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
}
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
}
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
clearLocalCache();
flushStatements(true);
} finally {
if (required) {
transaction.rollback();
}
}
}
}
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
// 接下去的4個方法由子類進行實現
protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException;
protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException;
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException;
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql)
throws SQLException;
protected void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// ignore
}
}
}
/**
* Apply a transaction timeout.
* @param statement a current statement
* @throws SQLException if a database access error occurs, this method is called on a closed <code>Statement</code>
* @since 3.4.0
* @see StatementUtil#applyTransactionTimeout(Statement, Integer, Integer)
*/
protected void applyTransactionTimeout(Statement statement) throws SQLException {
StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout());
}
private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
final Object cachedParameter = localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {
final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);
final MetaObject metaParameter = configuration.newMetaObject(parameter);
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
final String parameterName = parameterMapping.getProperty();
final Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}
}
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
@Override
public void setExecutorWrapper(Executor wrapper) {
this.wrapper = wrapper;
}
private static class DeferredLoad {
private final MetaObject resultObject;
private final String property;
private final Class<?> targetType;
private final CacheKey key;
private final PerpetualCache localCache;
private final ObjectFactory objectFactory;
private final ResultExtractor resultExtractor;
// issue #781
public DeferredLoad(MetaObject resultObject,
String property,
CacheKey key,
PerpetualCache localCache,
Configuration configuration,
Class<?> targetType) {
this.resultObject = resultObject;
this.property = property;
this.key = key;
this.localCache = localCache;
this.objectFactory = configuration.getObjectFactory();
this.resultExtractor = new ResultExtractor(configuration, objectFactory);
this.targetType = targetType;
}
public boolean canLoad() {
return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;
}
public void load() {
@SuppressWarnings( "unchecked" )
// we suppose we get back a List
List<Object> list = (List<Object>) localCache.getObject(key);
Object value = resultExtractor.extractObjectFromList(list, targetType);
resultObject.setValue(property, value);
}
}
}
我們先來看下BaseExecutor的屬性,從上述BaseExecutor的定義可以看出:
執行器在特定的事務上下文下執行;
具有本地緩存和本地出參緩存(任何時候,只要事務提交或者回滾或者執行update或者查詢時設定了刷新緩存,都會清空本地緩存和本地出參緩存);
具有延遲載入任務;
BaseExecutor實現了大部分通用功能本地緩存管理、事務提交、回滾、超時設置、延遲載入等,但是將下列4個方法留給了具體的子類實現:
protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException;
protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException;
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException;
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql)
throws SQLException;
從功能上來說,這三種執行器的差別在於:
ExecutorType.SIMPLE:這個執行器類型不做特殊的事情。它為每個語句的每次執行創建一個新的預處理語句。
ExecutorType.REUSE:這個執行器類型會復用預處理語句。
ExecutorType.BATCH:這個執行器會批量執行所有更新語句,也就是jdbc addBatch API的facade模式。 所以這三種類型的執行器可以說時應用於不同的負載場景下,除了SIMPLE類型外,另外兩種要求對系統有較好的架構設計,當然也提供了更多的回報。
5.4.1 SIMPLE執行器
我們先來看SIMPLE各個方法的實現,
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>queryCursor(stmt);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
}
簡單執行器的實現非常的簡單,我們就不展開詳述了。下面倆看REUSE執行器。
5.4.2 REUSE執行器
我們來看下REUSE執行器中和SIMPLE執行器不同的地方:
public class ReuseExecutor extends BaseExecutor {
private final Map<String, Statement> statementMap = new HashMap<String, Statement>();
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>queryCursor(stmt);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
for (Statement stmt : statementMap.values()) {
closeStatement(stmt);
}
statementMap.clear();
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
if (hasStatementFor(sql)) {
stmt = getStatement(sql);
applyTransactionTimeout(stmt);
} else {
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
private boolean hasStatementFor(String sql) {
try {
return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
private Statement getStatement(String s) {
return statementMap.get(s);
}
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
}
從實現可以看出,REUSE和SIMPLE在doUpdate/doQuery上有個差別,不再是每執行一個語句就close掉了,而是儘可能的根據SQL文本進行緩存並重用,但是由於資料庫伺服器端通常對每個連接以及全局的語句(oracle稱為游標)handler的數量有限制,oracle中是open_cursors參數控制,mysql中是mysql_stmt_close參數控制,這就會導致如果sql都是靠if各種拼接出來,日積月累可能會導致資料庫資源耗盡。其是否有足夠價值,視創建Statement語句消耗的資源佔整體資源的比例、以及一共有多少完全不同的Statement數量而定,一般來說,純粹的OLTP且非自動生成的sqlmap,它會比SIMPLE執行器更好。
5.4.3 BATCH執行器
BATCH執行器的實現代碼如下:
public class BatchExecutor extends BaseExecutor {
public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
// 存儲在一個事務中的批量DML的語句列表
private final List<Statement> statementList = new ArrayList<Statement>();
// 存放DML語句對應的參數對象,包括自動/手工生成的key
private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
// 最新提交執行的SQL語句
private String currentSql;
// 最新提交執行的語句
private MappedStatement currentStatement;
public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
final Configuration configuration = ms.getConfiguration();
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
final BoundSql boundSql = handler.getBoundSql();
final String sql = boundSql.getSql();
final Statement stmt;
// 如果最新執行的一條語句和前面一條語句相同,就不創建新的語句了,直接用緩存的語句,只是把參數對象添加到該語句對應的BatchResult中
// 否則的話,無論是否在未提交之前,還有pending的語句,都新插入一條語句到list中
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
stmt = statementList.get(last);
applyTransactionTimeout(stmt);
handler.parameterize(stmt);//fix Issues 322
BatchResult batchResult = batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt); //fix Issues 322
currentSql = sql;
currentStatement = ms;
statementList.add(stmt);
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
// handler.parameterize(stmt);
// 調用jdbc的addBatch方法
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException {
Statement stmt = null;
try {
flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Connection connection = getConnection(ms.getStatementLog());
Statement stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return handler.<E>queryCursor(stmt);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<BatchResult>();
if (isRollback) {
return Collections.emptyList();
}
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// Close statement to close cursor #1109
closeStatement(stmt);
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
}
批量執行器是JDBC Statement.addBatch的實現,對於批量insert而言比如導入大量數據的ETL,驅動器如果支持的話,能夠大幅度的提高DML語句的性能(首先最重要的是,網路交互就大幅度減少了),比如對於mysql而言,在5.1.13以上版本的驅動,在連接字元串上rewriteBatchedStatements參數也就是jdbc:mysql://192.168.1.100:3306/test?rewriteBatchedStatements=true後,性能可以提高几十倍,參見 https://www. cnblogs.com/kxdblog/p/4 056010.html 以及 http:// blog.sina.com.cn/s/blog _68b4c68f01013yog.html 。因為BatchExecutor對於每個statementList中的語句,都執行executeBatch()方法,因此最差的極端情況是交叉執行不同的DML SQL語句,這種情況退化為原始的方式。比如下列形式就是最差的情況:
for(int i=0;i<100;i++) {
session.update("insertUser", userReq);
session.update("insertUserProfile", userReq);
}
5.4.4 緩存執行器CachingExecutor的實現
public class CachingExecutor implements Executor {
private final Executor delegate;
private final TransactionalCacheManager tcm = new TransactionalCacheManager();
public CachingExecutor(Executor delegate) {
this.delegate = delegate;
delegate.setExecutorWrapper(this);
}
@Override
public Transaction getTransaction() {
return delegate.getTransaction();
}
@Override
public void close(boolean forceRollback) {
try {
//issues #499, #524 and #573
if (forceRollback) {
tcm.rollback();
} else {
tcm.commit();
}
} finally {
delegate.close(forceRollback);
}
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
flushCacheIfRequired(ms);
return delegate.update(ms, parameterObject);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
flushCacheIfRequired(ms);
return delegate.queryCursor(ms, parameter, rowBounds);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
// 首先判斷是否啟用了二級緩存
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
// 然後判斷緩存中是否有對應的緩存條目(正常情況下,執行DML操作會清空緩存,也可以語句層面明確明確設置),有的話則返回,這樣就不用二次查詢了
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public List<BatchResult> flushStatements() throws SQLException {
return delegate.flushStatements();
}
@Override
public void commit(boolean required) throws SQLException {
delegate.commit(required);
tcm.commit();
}
@Override
public void rollback(boolean required) throws SQLException {
try {
delegate.rollback(required);
} finally {
if (required) {
tcm.rollback();
}
}
}
// 存儲過程不支持二級緩存
private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
throw new ExecutorException("Caching stored procedures with OUT params is not supported. Please configure useCache=false in " + ms.getId() + " statement.");
}
}
}
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return delegate.isCached(ms, key);
}
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
delegate.deferLoad(ms, resultObject, property, key, targetType);
}
@Override
public void clearLocalCache() {
delegate.clearLocalCache();
}
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
tcm.clear(cache);
}
}
@Override
public void setExecutorWrapper(Executor executor) {
throw new UnsupportedOperationException("This method should not be called");
}
}
緩存執行器相對於其他執行器的差別在於,首先是在query()方法中判斷是否使用二級緩存(也就是mapper級別的緩存)。雖然mybatis默認啟用了CachingExecutor,但是如果在mapper層面沒有明確設置二級緩存的話,就退化為SimpleExecutor了。二級緩存的維護由TransactionalCache(事務化緩存)負責,當在TransactionalCacheManager(事務化緩存管理器)中調用putObject和removeObject方法的時候並不是馬上就把對象存放到緩存或者從緩存中刪除,而是先把這個對象放到entriesToAddOnCommit和entriesToRemoveOnCommit這兩個HashMap之中的一個里,然後當執行commit/rollback方法時再真正地把對象存放到緩存或者從緩存中刪除,具體可以參見TransactionalCache.commit/rollback方法。
還有一個差別是使用了TransactionalCacheManager管理事務,其他邏輯就一樣了。
5.2 參數處理器ParameterHandler
ParameterHandler的介面定義如下:
public interface ParameterHandler {
Object getParameterObject();
void setParameters(PreparedStatement ps)
throws SQLException;
}
ParameterHandler只有一個默認實現DefaultParameterHandler,它的代碼如下:
public class DefaultParameterHandler implements ParameterHandler {
private final TypeHandlerRegistry typeHandlerRegistry;
private final MappedStatement mappedStatement;
private final Object parameterObject;
private final BoundSql boundSql;
private final Configuration configuration;
public DefaultParameterHandler(MappedStatement mappedStatement, Object parameterObject, BoundSql boundSql) {
this.mappedStatement = mappedStatement;
this.configuration = mappedStatement.getConfiguration();
this.typeHandlerRegistry = mappedStatement.getConfiguration().getTypeHandlerRegistry();
this.parameterObject = parameterObject;
this.boundSql = boundSql;
}
@Override
public Object getParameterObject() {
return parameterObject;
}
// 設置PreparedStatement的入參
@Override
public void setParameters(PreparedStatement ps) {
ErrorContext.instance().activity("setting parameters").object(mappedStatement.getParameterMap().getId());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
if (parameterMappings != null) {
for (int i = 0; i < parameterMappings.size(); i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) { // issue #448 ask first for additional params
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
TypeHandler typeHandler = parameterMapping.getTypeHandler();
JdbcType jdbcType = parameterMapping.getJdbcType();
if (value == null && jdbcType == null) {
jdbcType = configuration.getJdbcTypeForNull();
}
try {
typeHandler.setParameter(ps, i + 1, value, jdbcType);
} catch (TypeException e) {
throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e);
} catch (SQLException e) {
throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e);
}
}
}
}
}
}
ParameterHandler的實現很簡單,上面在執行語句的時候詳細解釋了每個步驟,這裡就不重複了。
5.3 語句處理器StatementHandler
先來看下StatementHandler的定義:
public interface StatementHandler {
Statement prepare(Connection connection, Integer transactionTimeout)
throws SQLException;
void parameterize(Statement statement)
throws SQLException;
void batch(Statement statement)
throws SQLException;
int update(Statement statement)
throws SQLException;
<E> List<E> query(Statement statement, ResultHandler resultHandler)
throws SQLException;
<E> Cursor<E> queryCursor(Statement statement)
throws SQLException;
BoundSql getBoundSql();
ParameterHandler getParameterHandler();
}
從介面可以看出,StatementHandler主要包括prepare語句、給語句設置參數、執行語句獲取要執行的SQL語句本身。mybatis包含了三種類型的StatementHandler實現:
分別用於JDBC對應的PrepareStatement,Statement以及CallableStatement。BaseStatementHandler是這三種類型語句處理器的抽象父類,封裝了一些實現細節比如設置超時時間、結果集每次提取大小等操作,代碼如下:public abstract class BaseStatementHandler implements StatementHandler {
protected final Configuration configuration;
protected final ObjectFactory objectFactory;
protected final TypeHandlerRegistry typeHandlerRegistry;
protected final ResultSetHandler resultSetHandler;
protected final ParameterHandler parameterHandler;
protected final Executor executor;
protected final MappedStatement mappedStatement;
protected final RowBounds rowBounds;
protected BoundSql boundSql;
protected BaseStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
this.configuration = mappedStatement.getConfiguration();
this.executor = executor;
this.mappedStatement = mappedStatement;
this.rowBounds = rowBounds;
this.typeHandlerRegistry = configuration.getTypeHandlerRegistry();
this.objectFactory = configuration.getObjectFactory();
if (boundSql == null) { // issue #435, get the key before calculating the statement
// 首先執行SelectKey對應的SQL語句把ID生成
generateKeys(parameterObject);
boundSql = mappedStatement.getBoundSql(parameterObject);
}
this.boundSql = boundSql;
this.parameterHandler = configuration.newParameterHandler(mappedStatement, parameterObject, boundSql);
this.resultSetHandler = configuration.newResultSetHandler(executor, mappedStatement, rowBounds, parameterHandler, resultHandler, boundSql);
}
@Override
public BoundSql getBoundSql() {
return boundSql;
}
@Override
public ParameterHandler getParameterHandler() {
return parameterHandler;
}
// prepare SQL語句
@Override
public Statement prepare(Connection connection, Integer transactionTimeout) throws SQLException {
ErrorContext.instance().sql(boundSql.getSql());
Statement statement = null;
try {
// 創建Statement
statement = instantiateStatement(connection);
setStatementTimeout(statement, transactionTimeout);
setFetchSize(statement);
return statement;
} catch (SQLException e) {
closeStatement(statement);
throw e;
} catch (Exception e) {
closeStatement(statement);
throw new ExecutorException("Error preparing statement. Cause: " + e, e);
}
}
// 不同類型語句的初始化過程不同,比如Statement語句直接調用JDBC java.sql.Connection.createStatement,而PrepareStatement則是調用java.sql.Connection.prepareStatement
protected abstract Statement instantiateStatement(Connection connection) throws SQLException;
// 設置JDBC語句超時時間,註:資料庫伺服器端也可以設置語句超時時間。mysql通過參數max_statement_time設置,oracle截止12.2c不支持
protected void setStatementTimeout(Statement stmt, Integer transactionTimeout) throws SQLException {
Integer queryTimeout = null;
if (mappedStatement.getTimeout() != null) {
queryTimeout = mappedStatement.getTimeout();
} else if (configuration.getDefaultStatementTimeout() != null) {
queryTimeout = configuration.getDefaultStatementTimeout();
}
if (queryTimeout != null) {
stmt.setQueryTimeout(queryTimeout);
}
StatementUtil.applyTransactionTimeout(stmt, queryTimeout, transactionTimeout);
}
// fetchSize設置每次從伺服器端提取的行數,默認不同資料庫實現不同,mysql一次性提取全部,oracle默認10。正確設置fetchSize可以避免OOM並且對性能有一定的影響,尤其是在網路延時較大的情況下
protected void setFetchSize(Statement stmt) throws SQLException {
Integer fetchSize = mappedStatement.getFetchSize();
if (fetchSize != null) {
stmt.setFetchSize(fetchSize);
return;
}
Integer defaultFetchSize = configuration.getDefaultFetchSize();
if (defaultFetchSize != null) {
stmt.setFetchSize(defaultFetchSize);
}
}
protected void closeStatement(Statement statement) {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException e) {
//ignore
}
}
protected void generateKeys(Object parameter) {
KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();
ErrorContext.instance().store();
keyGenerator.processBefore(executor, mappedStatement, null, parameter);
ErrorContext.instance().recall();
}
}
5.4 結果集處理器ResultSetHandler
結果集處理器,顧名知義,就是用了對查詢結果集進行處理的,目標是將JDBC結果集映射為業務對象。其介面定義如下:
public interface ResultSetHandler {
<E> List<E> handleResultSets(Statement stmt) throws SQLException;
<E> Cursor<E> handleCursorResultSets(Statement stmt) throws SQLException;
void handleOutputParameters(CallableStatement cs) throws SQLException;
}
介面中定義的三個介面分別用於處理常規查詢的結果集,游標查詢的結果集以及存儲過程調用的出參設置。和參數處理器一樣,結果集處理器也只有一個默認實現DefaultResultSetHandler。結果集處理器的功能包括對象的實例化、屬性自動匹配計算、常規屬性賦值、嵌套ResultMap的處理、嵌套查詢的處理、鑒別器結果集的處理等,每個功能我們在分析SQL語句執行selectXXX的時候都詳細的講解過了,具體可以參見selectXXX部分。
推薦閱讀: