Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/EFCore.Sharding/DependencyInjection/IShardingBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public interface IShardingBuilder
/// <returns></returns>
IShardingBuilder CreateShardingTableOnStarting(bool enable);

/// <summary>
/// 启动时创建表完成
/// </summary>
/// <param name="callback"></param>
/// <returns></returns>
IShardingBuilder CreateShardingTableOnStartingFinish(Action callback);

/// <summary>
/// 是否启用分表数据库迁移,默认false
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion src/EFCore.Sharding/DependencyInjection/IShardingConfig.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;

namespace EFCore.Sharding
Expand All @@ -7,6 +8,7 @@ internal interface IShardingConfig
{
DatabaseType FindADbType();
List<(string suffix, string conString, DatabaseType dbType)> GetReadTables<T>(IQueryable<T> source);
List<(string suffix, string conString, DatabaseType dbType)> GetReadTables<T>(IQueryable<T> source, DateTime s, DateTime e);
List<(string suffix, string conString, DatabaseType dbType)> GetWriteTables<T>(IQueryable<T> source = null);
(string suffix, string conString, DatabaseType dbType) GetTheWriteTable<T>(T obj);
}
Expand Down
22 changes: 21 additions & 1 deletion src/EFCore.Sharding/DependencyInjection/ShardingContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ internal class ShardingContainer : IShardingConfig, IShardingBuilder
#region 构造函数

private readonly IServiceCollection _services;
private Action _createShardingTableOnStartingFinishFun = null;
public ShardingContainer(IServiceCollection services)
{
_services = services;
Expand Down Expand Up @@ -145,6 +146,16 @@ private void AddShardingTable(string absTableName, string fullTableName)

return FilterTable(allTables, source);
}
public List<(string suffix, string conString, DatabaseType dbType)> GetReadTables<T>(IQueryable<T> source, DateTime s, DateTime e)
{
var allTables = GetTargetTables<T>(ReadWriteType.Read);
var ret = FilterTable(allTables, source);
var entityType = typeof(T);
var rule = _shardingRules.Where(x => x.EntityType == entityType).FirstOrDefault();
var sT = rule.GetTableSuffixByField(s);
var eT = rule.GetTableSuffixByField(e);
return ret.Where(t => t.suffix.CompareTo(sT) >= 0 && t.suffix.CompareTo(eT) <= 0).ToList();
}
public DatabaseType FindADbType()
{
return _dataSources.FirstOrDefault().DbType;
Expand Down Expand Up @@ -381,7 +392,10 @@ public IShardingBuilder SetDateSharding<TEntity>(string shardingField, ExpandByD

theTime = paramter.nextTime(theTime);
}

if (sharingOption.CreateShardingTableOnStarting)
{
_createShardingTableOnStartingFinishFun?.Invoke();
}
//定时自动建表
JobHelper.SetCronJob(() =>
{
Expand Down Expand Up @@ -448,6 +462,12 @@ public IShardingBuilder SetHashModSharding<TEntity>(string shardingField, int mo
return this;
}

public IShardingBuilder CreateShardingTableOnStartingFinish(Action callback)
{
_createShardingTableOnStartingFinishFun = callback;
return this;
}

#endregion
}
}
2 changes: 1 addition & 1 deletion src/EFCore.Sharding/Sharding/IShardingQueryable.T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface IShardingQueryable<T> where T : class
/// 异步获取列表
/// </summary>
/// <returns></returns>
Task<List<T>> ToListAsync();
Task<List<T>> ToListAsync((DateTime s, DateTime e)? range = null);

/// <summary>
/// 获取第一个,若不存在则返回默认值
Expand Down
6 changes: 4 additions & 2 deletions src/EFCore.Sharding/Sharding/ShardingIQueryable.T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public List<T> ToList()
{
return AsyncHelper.RunSync(() => ToListAsync());
}
public async Task<List<T>> ToListAsync()
public async Task<List<T>> ToListAsync((DateTime s, DateTime e)? range = null)
{
//去除分页,获取前Take+Skip数量
int? take = _source.GetTakeCount();
Expand All @@ -164,7 +164,9 @@ public async Task<List<T>> ToListAsync()
noPaginSource = noPaginSource.Take(take.Value + skip.Value);

//从各个分表获取数据
var tables = _shardingConfig.GetReadTables(_source);
var tables = range == null ? _shardingConfig.GetReadTables(_source)
: _shardingConfig.GetReadTables(_source, range.Value.s, range.Value.e)
;
SynchronizedCollection<IDbAccessor> dbs = new SynchronizedCollection<IDbAccessor>();
List<Task<List<T>>> tasks = tables.Select(aTable =>
{
Expand Down