diff options
Diffstat (limited to 'vendor/github.com/denisenkom/go-mssqldb/token.go')
-rw-r--r-- | vendor/github.com/denisenkom/go-mssqldb/token.go | 754 |
1 files changed, 0 insertions, 754 deletions
diff --git a/vendor/github.com/denisenkom/go-mssqldb/token.go b/vendor/github.com/denisenkom/go-mssqldb/token.go deleted file mode 100644 index 6b84f94d..00000000 --- a/vendor/github.com/denisenkom/go-mssqldb/token.go +++ /dev/null @@ -1,754 +0,0 @@ -package mssql - -import ( - "encoding/binary" - "errors" - "io" - "net" - "strconv" - "strings" - - "golang.org/x/net/context" -) - -//go:generate stringer -type token - -type token byte - -// token ids -const ( - tokenReturnStatus token = 121 // 0x79 - tokenColMetadata token = 129 // 0x81 - tokenOrder token = 169 // 0xA9 - tokenError token = 170 // 0xAA - tokenInfo token = 171 // 0xAB - tokenLoginAck token = 173 // 0xad - tokenRow token = 209 // 0xd1 - tokenNbcRow token = 210 // 0xd2 - tokenEnvChange token = 227 // 0xE3 - tokenSSPI token = 237 // 0xED - tokenDone token = 253 // 0xFD - tokenDoneProc token = 254 - tokenDoneInProc token = 255 -) - -// done flags -// https://msdn.microsoft.com/en-us/library/dd340421.aspx -const ( - doneFinal = 0 - doneMore = 1 - doneError = 2 - doneInxact = 4 - doneCount = 0x10 - doneAttn = 0x20 - doneSrvError = 0x100 -) - -// ENVCHANGE types -// http://msdn.microsoft.com/en-us/library/dd303449.aspx -const ( - envTypDatabase = 1 - envTypLanguage = 2 - envTypCharset = 3 - envTypPacketSize = 4 - envSortId = 5 - envSortFlags = 6 - envSqlCollation = 7 - envTypBeginTran = 8 - envTypCommitTran = 9 - envTypRollbackTran = 10 - envEnlistDTC = 11 - envDefectTran = 12 - envDatabaseMirrorPartner = 13 - envPromoteTran = 15 - envTranMgrAddr = 16 - envTranEnded = 17 - envResetConnAck = 18 - envStartedInstanceName = 19 - envRouting = 20 -) - -// COLMETADATA flags -// https://msdn.microsoft.com/en-us/library/dd357363.aspx -const ( - colFlagNullable = 1 - // TODO implement more flags -) - -// interface for all tokens -type tokenStruct interface{} - -type orderStruct struct { - ColIds []uint16 -} - -type doneStruct struct { - Status uint16 - CurCmd uint16 - RowCount uint64 - errors []Error -} - -func (d doneStruct) isError() bool { - return d.Status&doneError != 0 || len(d.errors) > 0 -} - -func (d doneStruct) getError() Error { - if len(d.errors) > 0 { - return d.errors[len(d.errors)-1] - } else { - return Error{Message: "Request failed but didn't provide reason"} - } -} - -type doneInProcStruct doneStruct - -var doneFlags2str = map[uint16]string{ - doneFinal: "final", - doneMore: "more", - doneError: "error", - doneInxact: "inxact", - doneCount: "count", - doneAttn: "attn", - doneSrvError: "srverror", -} - -func doneFlags2Str(flags uint16) string { - strs := make([]string, 0, len(doneFlags2str)) - for flag, tag := range doneFlags2str { - if flags&flag != 0 { - strs = append(strs, tag) - } - } - return strings.Join(strs, "|") -} - -// ENVCHANGE stream -// http://msdn.microsoft.com/en-us/library/dd303449.aspx -func processEnvChg(sess *tdsSession) { - size := sess.buf.uint16() - r := &io.LimitedReader{R: sess.buf, N: int64(size)} - for { - var err error - var envtype uint8 - err = binary.Read(r, binary.LittleEndian, &envtype) - if err == io.EOF { - return - } - if err != nil { - badStreamPanic(err) - } - switch envtype { - case envTypDatabase: - sess.database, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - case envTypLanguage: - //currently ignored - // old value - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - // new value - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - case envTypCharset: - //currently ignored - // old value - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - // new value - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - case envTypPacketSize: - packetsize, err := readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - packetsizei, err := strconv.Atoi(packetsize) - if err != nil { - badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error()) - } - sess.buf.ResizeBuffer(packetsizei) - case envSortId: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // new value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envSortFlags: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // new value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envSqlCollation: - // currently ignored - // old value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // new value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envTypBeginTran: - tranid, err := readBVarByte(r) - if len(tranid) != 8 { - badStreamPanicf("invalid size of transaction identifier: %d", len(tranid)) - } - sess.tranid = binary.LittleEndian.Uint64(tranid) - if err != nil { - badStreamPanic(err) - } - if sess.logFlags&logTransaction != 0 { - sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid) - } - _, err = readBVarByte(r) - if err != nil { - badStreamPanic(err) - } - case envTypCommitTran, envTypRollbackTran: - _, err = readBVarByte(r) - if err != nil { - badStreamPanic(err) - } - _, err = readBVarByte(r) - if err != nil { - badStreamPanic(err) - } - if sess.logFlags&logTransaction != 0 { - if envtype == envTypCommitTran { - sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid) - } else { - sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid) - } - } - sess.tranid = 0 - case envEnlistDTC: - // currently ignored - // old value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // new value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envDefectTran: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // new value - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envDatabaseMirrorPartner: - sess.partner, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - _, err = readBVarChar(r) - if err != nil { - badStreamPanic(err) - } - case envPromoteTran: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // dtc token - // spec says it should be L_VARBYTE, so this code might be wrong - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envTranMgrAddr: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // XACT_MANAGER_ADDRESS = B_VARBYTE - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envTranEnded: - // currently ignored - // old value, B_VARBYTE - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envResetConnAck: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envStartedInstanceName: - // currently ignored - // old value, should be 0 - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - // instance name - if _, err = readBVarChar(r); err != nil { - badStreamPanic(err) - } - case envRouting: - // RoutingData message is: - // ValueLength USHORT - // Protocol (TCP = 0) BYTE - // ProtocolProperty (new port) USHORT - // AlternateServer US_VARCHAR - _, err := readUshort(r) - if err != nil { - badStreamPanic(err) - } - protocol, err := readByte(r) - if err != nil || protocol != 0 { - badStreamPanic(err) - } - newPort, err := readUshort(r) - if err != nil { - badStreamPanic(err) - } - newServer, err := readUsVarChar(r) - if err != nil { - badStreamPanic(err) - } - // consume the OLDVALUE = %x00 %x00 - _, err = readUshort(r) - if err != nil { - badStreamPanic(err) - } - sess.routedServer = newServer - sess.routedPort = newPort - default: - // ignore rest of records because we don't know how to skip those - sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype) - break - } - - } -} - -type returnStatus int32 - -// http://msdn.microsoft.com/en-us/library/dd358180.aspx -func parseReturnStatus(r *tdsBuffer) returnStatus { - return returnStatus(r.int32()) -} - -func parseOrder(r *tdsBuffer) (res orderStruct) { - len := int(r.uint16()) - res.ColIds = make([]uint16, len/2) - for i := 0; i < len/2; i++ { - res.ColIds[i] = r.uint16() - } - return res -} - -// https://msdn.microsoft.com/en-us/library/dd340421.aspx -func parseDone(r *tdsBuffer) (res doneStruct) { - res.Status = r.uint16() - res.CurCmd = r.uint16() - res.RowCount = r.uint64() - return res -} - -// https://msdn.microsoft.com/en-us/library/dd340553.aspx -func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) { - res.Status = r.uint16() - res.CurCmd = r.uint16() - res.RowCount = r.uint64() - return res -} - -type sspiMsg []byte - -func parseSSPIMsg(r *tdsBuffer) sspiMsg { - size := r.uint16() - buf := make([]byte, size) - r.ReadFull(buf) - return sspiMsg(buf) -} - -type loginAckStruct struct { - Interface uint8 - TDSVersion uint32 - ProgName string - ProgVer uint32 -} - -func parseLoginAck(r *tdsBuffer) loginAckStruct { - size := r.uint16() - buf := make([]byte, size) - r.ReadFull(buf) - var res loginAckStruct - res.Interface = buf[0] - res.TDSVersion = binary.BigEndian.Uint32(buf[1:]) - prognamelen := buf[1+4] - var err error - if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil { - badStreamPanic(err) - } - res.ProgVer = binary.BigEndian.Uint32(buf[size-4:]) - return res -} - -// http://msdn.microsoft.com/en-us/library/dd357363.aspx -func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) { - count := r.uint16() - if count == 0xffff { - // no metadata is sent - return nil - } - columns = make([]columnStruct, count) - for i := range columns { - column := &columns[i] - column.UserType = r.uint32() - column.Flags = r.uint16() - - // parsing TYPE_INFO structure - column.ti = readTypeInfo(r) - column.ColName = r.BVarChar() - } - return columns -} - -// http://msdn.microsoft.com/en-us/library/dd357254.aspx -func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) { - for i, column := range columns { - row[i] = column.ti.Reader(&column.ti, r) - } -} - -// http://msdn.microsoft.com/en-us/library/dd304783.aspx -func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) { - bitlen := (len(columns) + 7) / 8 - pres := make([]byte, bitlen) - r.ReadFull(pres) - for i, col := range columns { - if pres[i/8]&(1<<(uint(i)%8)) != 0 { - row[i] = nil - continue - } - row[i] = col.ti.Reader(&col.ti, r) - } -} - -// http://msdn.microsoft.com/en-us/library/dd304156.aspx -func parseError72(r *tdsBuffer) (res Error) { - length := r.uint16() - _ = length // ignore length - res.Number = r.int32() - res.State = r.byte() - res.Class = r.byte() - res.Message = r.UsVarChar() - res.ServerName = r.BVarChar() - res.ProcName = r.BVarChar() - res.LineNo = r.int32() - return -} - -// http://msdn.microsoft.com/en-us/library/dd304156.aspx -func parseInfo(r *tdsBuffer) (res Error) { - length := r.uint16() - _ = length // ignore length - res.Number = r.int32() - res.State = r.byte() - res.Class = r.byte() - res.Message = r.UsVarChar() - res.ServerName = r.BVarChar() - res.ProcName = r.BVarChar() - res.LineNo = r.int32() - return -} - -func processSingleResponse(sess *tdsSession, ch chan tokenStruct) { - defer func() { - if err := recover(); err != nil { - if sess.logFlags&logErrors != 0 { - sess.log.Printf("ERROR: Intercepted panic %v", err) - } - ch <- err - } - close(ch) - }() - - packet_type, err := sess.buf.BeginRead() - if err != nil { - if sess.logFlags&logErrors != 0 { - sess.log.Printf("ERROR: BeginRead failed %v", err) - } - ch <- err - return - } - if packet_type != packReply { - badStreamPanicf("invalid response packet type, expected REPLY, actual: %d", packet_type) - } - var columns []columnStruct - errs := make([]Error, 0, 5) - for { - token := token(sess.buf.byte()) - if sess.logFlags&logDebug != 0 { - sess.log.Printf("got token %v", token) - } - switch token { - case tokenSSPI: - ch <- parseSSPIMsg(sess.buf) - return - case tokenReturnStatus: - returnStatus := parseReturnStatus(sess.buf) - ch <- returnStatus - case tokenLoginAck: - loginAck := parseLoginAck(sess.buf) - ch <- loginAck - case tokenOrder: - order := parseOrder(sess.buf) - ch <- order - case tokenDoneInProc: - done := parseDoneInProc(sess.buf) - if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 { - sess.log.Printf("(%d row(s) affected)\n", done.RowCount) - } - ch <- done - case tokenDone, tokenDoneProc: - done := parseDone(sess.buf) - done.errors = errs - if sess.logFlags&logDebug != 0 { - sess.log.Printf("got DONE or DONEPROC status=%d", done.Status) - } - if done.Status&doneSrvError != 0 { - ch <- errors.New("SQL Server had internal error") - return - } - if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 { - sess.log.Printf("(%d row(s) affected)\n", done.RowCount) - } - ch <- done - if done.Status&doneMore == 0 { - return - } - case tokenColMetadata: - columns = parseColMetadata72(sess.buf) - ch <- columns - case tokenRow: - row := make([]interface{}, len(columns)) - parseRow(sess.buf, columns, row) - ch <- row - case tokenNbcRow: - row := make([]interface{}, len(columns)) - parseNbcRow(sess.buf, columns, row) - ch <- row - case tokenEnvChange: - processEnvChg(sess) - case tokenError: - err := parseError72(sess.buf) - if sess.logFlags&logDebug != 0 { - sess.log.Printf("got ERROR %d %s", err.Number, err.Message) - } - errs = append(errs, err) - if sess.logFlags&logErrors != 0 { - sess.log.Println(err.Message) - } - case tokenInfo: - info := parseInfo(sess.buf) - if sess.logFlags&logDebug != 0 { - sess.log.Printf("got INFO %d %s", info.Number, info.Message) - } - if sess.logFlags&logMessages != 0 { - sess.log.Println(info.Message) - } - default: - badStreamPanicf("Unknown token type: %d", token) - } - } -} - -type parseRespIter byte - -const ( - parseRespIterContinue parseRespIter = iota // Continue parsing current token. - parseRespIterNext // Fetch the next token. - parseRespIterDone // Done with parsing the response. -) - -type parseRespState byte - -const ( - parseRespStateNormal parseRespState = iota // Normal response state. - parseRespStateCancel // Query is canceled, wait for server to confirm. - parseRespStateClosing // Waiting for tokens to come through. -) - -type parseResp struct { - sess *tdsSession - ctxDone <-chan struct{} - state parseRespState - cancelError error -} - -func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter { - if err := sendAttention(ts.sess.buf); err != nil { - ts.dlogf("failed to send attention signal %v", err) - ch <- err - return parseRespIterDone - } - ts.state = parseRespStateCancel - return parseRespIterContinue -} - -func (ts *parseResp) dlog(msg string) { - if ts.sess.logFlags&logDebug != 0 { - ts.sess.log.Println(msg) - } -} -func (ts *parseResp) dlogf(f string, v ...interface{}) { - if ts.sess.logFlags&logDebug != 0 { - ts.sess.log.Printf(f, v...) - } -} - -func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter { - switch ts.state { - default: - panic("unknown state") - case parseRespStateNormal: - select { - case tok, ok := <-tokChan: - if !ok { - ts.dlog("response finished") - return parseRespIterDone - } - if err, ok := tok.(net.Error); ok && err.Timeout() { - ts.cancelError = err - ts.dlog("got timeout error, sending attention signal to server") - return ts.sendAttention(ch) - } - // Pass the token along. - ch <- tok - return parseRespIterContinue - - case <-ts.ctxDone: - ts.ctxDone = nil - ts.dlog("got cancel message, sending attention signal to server") - return ts.sendAttention(ch) - } - case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth - select { - case tok, ok := <-tokChan: - if !ok { - ts.dlog("response finished but waiting for attention ack") - return parseRespIterNext - } - switch tok := tok.(type) { - default: - // Ignore all other tokens while waiting. - // The TDS spec says other tokens may arrive after an attention - // signal is sent. Ignore these tokens and continue looking for - // a DONE with attention confirm mark. - case doneStruct: - if tok.Status&doneAttn != 0 { - ts.dlog("got cancellation confirmation from server") - if ts.cancelError != nil { - ch <- ts.cancelError - ts.cancelError = nil - } else { - ch <- ctx.Err() - } - return parseRespIterDone - } - - // If an error happens during cancel, pass it along and just stop. - // We are uncertain to receive more tokens. - case error: - ch <- tok - ts.state = parseRespStateClosing - } - return parseRespIterContinue - case <-ts.ctxDone: - ts.ctxDone = nil - ts.state = parseRespStateClosing - return parseRespIterContinue - } - case parseRespStateClosing: // Wait for current token chan to close. - if _, ok := <-tokChan; !ok { - ts.dlog("response finished") - return parseRespIterDone - } - return parseRespIterContinue - } -} - -func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct) { - ts := &parseResp{ - sess: sess, - ctxDone: ctx.Done(), - } - defer func() { - // Ensure any remaining error is piped through - // or the query may look like it executed when it actually failed. - if ts.cancelError != nil { - ch <- ts.cancelError - ts.cancelError = nil - } - close(ch) - }() - - // Loop over multiple responses. - for { - ts.dlog("initiating resonse reading") - - tokChan := make(chan tokenStruct) - go processSingleResponse(sess, tokChan) - - // Loop over multiple tokens in response. - tokensLoop: - for { - switch ts.iter(ctx, ch, tokChan) { - case parseRespIterContinue: - // Nothing, continue to next token. - case parseRespIterNext: - break tokensLoop - case parseRespIterDone: - return - } - } - } -} |