-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathDataTable.fs
More file actions
133 lines (111 loc) · 5.86 KB
/
DataTable.fs
File metadata and controls
133 lines (111 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
namespace FSharp.Data.SqlClient
open System
open System.Data
open System.Data.SqlClient
open System.Collections.Generic
open FSharp.Data.SqlClient.Internals
[<Sealed>]
[<CompilerMessageAttribute("This API supports the FSharp.Data.SqlClient infrastructure and is not intended to be used directly from your code.", 101, IsHidden = true)>]
[<RequireQualifiedAccessAttribute>]
type DataTable<'T when 'T :> DataRow>(selectCommand: SqlCommand, ?connectionString: Lazy<string>) =
inherit DataTable()
let rows = base.Rows
member __.Rows : IList<'T> = {
new IList<'T> with
member __.GetEnumerator() = rows.GetEnumerator()
member __.GetEnumerator() : IEnumerator<'T> = (Seq.cast<'T> rows).GetEnumerator()
member __.Count = rows.Count
member __.IsReadOnly = rows.IsReadOnly
member __.Item
with get index = downcast rows.[index]
and set index row =
rows.RemoveAt(index)
rows.InsertAt(row, index)
member __.Add row = rows.Add row
member __.Clear() = rows.Clear()
member __.Contains row = rows.Contains row
member __.CopyTo(dest, index) = rows.CopyTo(dest, index)
member __.IndexOf row = rows.IndexOf row
member __.Insert(index, row) = rows.InsertAt(row, index)
member __.Remove row = rows.Remove(row); true
member __.RemoveAt index = rows.RemoveAt(index)
}
member __.NewRow(): 'T = downcast base.NewRow()
member private this.IsDirectTable = not (isNull this.TableName)
member this.Update(?connection, ?transaction, ?batchSize, ?continueUpdateOnError, ?timeout: TimeSpan) =
// not supported on all DataTable instances
match selectCommand with
| null -> failwith "This command wasn't constructed from SqlProgrammabilityProvider, call to Update is not supported."
| _ -> ()
connection |> Option.iter selectCommand.set_Connection
transaction |> Option.iter selectCommand.set_Transaction
if isNull selectCommand.Connection && this.IsDirectTable
then
assert(connectionString.IsSome)
selectCommand.Connection <- new SqlConnection( connectionString.Value.Value)
use dataAdapter = new SqlDataAdapter(selectCommand)
use commandBuilder = new SqlCommandBuilder(dataAdapter)
// Pre-compute per-column metadata once rather than inside the per-row event handler.
// AutoIncrement columns always need a server-side refresh; AllowDBNull columns need one
// only when the row has no explicit value. Quoting column names is also constant.
let alwaysRefreshQuoted =
[| for c in this.Columns do
if c.AutoIncrement then
yield "inserted." + commandBuilder.QuoteIdentifier c.ColumnName |]
let maybeRefreshCols =
[| for c in this.Columns do
if c.AllowDBNull && not c.AutoIncrement then
yield c, "inserted." + commandBuilder.QuoteIdentifier c.ColumnName |]
use __ = dataAdapter.RowUpdating.Subscribe(fun args ->
timeout |> Option.iter (fun x -> args.Command.CommandTimeout <- int x.TotalSeconds)
if args.Errors = null && args.StatementType = StatementType.Insert
&& defaultArg batchSize dataAdapter.UpdateBatchSize = 1
then
let columnsToRefresh = ResizeArray(alwaysRefreshQuoted)
for c, quotedName in maybeRefreshCols do
if args.Row.IsNull c.Ordinal then
columnsToRefresh.Add quotedName
if columnsToRefresh.Count > 0
then
let outputClause = columnsToRefresh |> String.concat "," |> sprintf " OUTPUT %s"
let cmd = args.Command
let sql = cmd.CommandText
let insertOutputClauseAt =
match sql.IndexOf( " DEFAULT VALUES") with
| -1 -> sql.IndexOf( " VALUES")
| pos -> pos
cmd.CommandText <- sql.Insert(insertOutputClauseAt, outputClause)
cmd.UpdatedRowSource <- UpdateRowSource.FirstReturnedRecord
)
batchSize |> Option.iter dataAdapter.set_UpdateBatchSize
continueUpdateOnError |> Option.iter dataAdapter.set_ContinueUpdateOnError
dataAdapter.Update(this)
member this.BulkCopy(?connection, ?copyOptions, ?transaction, ?batchSize, ?timeout: TimeSpan) =
let conn', tran' =
match connection, transaction with
| _, Some(t: SqlTransaction) -> t.Connection, t
| Some c, None -> c, null
| None, None ->
match selectCommand with
| null -> failwith "To issue BulkCopy on this table, you need to provide your own connection or transaction"
| _ -> ()
if this.IsDirectTable
then
assert(connectionString.IsSome)
new SqlConnection(connectionString.Value.Value), null
else
selectCommand.Connection, selectCommand.Transaction
use __ = conn'.UseLocally()
let options = defaultArg copyOptions SqlBulkCopyOptions.Default
use bulkCopy = new SqlBulkCopy(conn', options, tran')
bulkCopy.DestinationTableName <- this.TableName
batchSize |> Option.iter bulkCopy.set_BatchSize
timeout |> Option.iter (fun x -> bulkCopy.BulkCopyTimeout <- int x.TotalSeconds)
bulkCopy.WriteToServer this
#if WITH_LEGACY_NAMESPACE
namespace FSharp.Data
open System
open System.Data
[<Obsolete("use 'FSharp.Data.SqlClient.DataTable' instead");AutoOpen>]
type DataTable<'T when 'T :> DataRow> = FSharp.Data.SqlClient.DataTable<'T>
#endif