telebit/mplexer/v1.go

237 lines
5.6 KiB
Go
Raw Normal View History

2020-05-22 10:41:24 +00:00
package telebit
2020-05-18 08:43:06 +00:00
import (
"errors"
"fmt"
"strconv"
"strings"
)
const (
2020-05-21 05:30:24 +00:00
// HeaderLengthState is the 2nd (1) state
2020-05-18 08:43:06 +00:00
HeaderLengthState State = 1 + iota
2020-05-21 05:30:24 +00:00
// HeaderState is the 3rd (2) state
2020-05-18 08:43:06 +00:00
HeaderState
2020-05-21 05:30:24 +00:00
// PayloadState is the 4th (3) state
2020-05-18 08:43:06 +00:00
PayloadState
)
const (
2020-05-21 05:30:24 +00:00
// FamilyIndex is the 1st (0) address element, either IPv4 or IPv6
2020-05-18 08:43:06 +00:00
FamilyIndex int = iota
2020-05-21 05:30:24 +00:00
// AddressIndex is the 2nd (1) address element, the IP or Hostname
2020-05-18 08:43:06 +00:00
AddressIndex
2020-05-21 05:30:24 +00:00
// PortIndex is the 3rd (2) address element, the Port
2020-05-18 08:43:06 +00:00
PortIndex
2020-05-21 05:30:24 +00:00
// LengthIndex is the 4th (3) address element, the Payload size
2020-05-18 08:43:06 +00:00
LengthIndex
2020-05-21 05:30:24 +00:00
// ServiceIndex is the 5th (4) address element, the Scheme or Control message type
2020-05-18 08:43:06 +00:00
ServiceIndex
2020-05-21 10:29:05 +00:00
// RelayPortIndex is the 6th (5) address element, the port on which the connection was established
RelayPortIndex
// ServernameIndex is the 7th (6) address element, the SNI Servername or Hostname
ServernameIndex
2020-05-18 08:43:06 +00:00
)
2020-05-21 05:30:24 +00:00
// Header is the MPLEXY address/control meta data that comes before a packet
2020-05-18 08:43:06 +00:00
type Header struct {
Family string
Address string
Port string
Service string
}
2020-05-19 04:36:20 +00:00
func (p *Parser) unpackV1(b []byte) (int, error) {
2020-05-18 08:43:06 +00:00
z := 0
for {
2020-05-19 04:36:20 +00:00
if z > 20 {
2020-05-18 08:43:06 +00:00
panic("stuck in an infinite loop?")
}
2020-05-19 07:06:10 +00:00
z++
2020-05-18 08:43:06 +00:00
n := len(b)
2020-05-19 04:36:20 +00:00
if n < 1 {
//fmt.Println("[debug] v1 end", z, n)
2020-05-18 08:43:06 +00:00
break
}
var err error
switch p.parseState {
2020-05-19 04:36:20 +00:00
case VersionState:
//fmt.Println("[debug] version state", b[0])
p.state.version = b[0]
b = b[1:]
2020-05-19 07:06:10 +00:00
p.consumed++
p.parseState++
2020-05-18 08:43:06 +00:00
case HeaderLengthState:
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] v1 h len")
2020-05-18 08:43:06 +00:00
b = p.unpackV1HeaderLength(b)
case HeaderState:
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] v1 header")
2020-05-18 08:43:06 +00:00
b, err = p.unpackV1Header(b, n)
if nil != err {
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] v1 header err", err)
consumed := p.consumed
p.consumed = 0
return consumed, err
2020-05-18 08:43:06 +00:00
}
case PayloadState:
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] v1 payload")
2020-05-18 08:43:06 +00:00
// if this payload is complete, reset all state
if p.state.payloadWritten == p.state.payloadLen {
p.state = ParserState{}
2020-05-19 04:36:20 +00:00
p.parseState = 0
2020-05-18 08:43:06 +00:00
}
b, err = p.unpackV1Payload(b, n)
if nil != err {
2020-05-19 04:36:20 +00:00
consumed := p.consumed
p.consumed = 0
return consumed, err
2020-05-18 08:43:06 +00:00
}
default:
2020-05-19 04:36:20 +00:00
fmt.Println("[debug] v1 unknown state")
2020-05-18 08:43:06 +00:00
// do nothing
2020-05-19 04:36:20 +00:00
consumed := p.consumed
p.consumed = 0
return consumed, errors.New("error unpacking")
2020-05-18 08:43:06 +00:00
}
}
2020-05-19 04:36:20 +00:00
consumed := p.consumed
p.consumed = 0
return consumed, nil
2020-05-18 08:43:06 +00:00
}
func (p *Parser) unpackV1HeaderLength(b []byte) []byte {
p.state.headerLen = int(b[0])
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] unpacked header len", p.state.headerLen)
2020-05-18 08:43:06 +00:00
b = b[1:]
2020-05-19 07:06:10 +00:00
p.consumed++
p.parseState++
2020-05-18 08:43:06 +00:00
return b
}
func (p *Parser) unpackV1Header(b []byte, n int) ([]byte, error) {
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] got", len(b), "bytes", string(b))
2020-05-18 08:43:06 +00:00
m := len(p.state.header)
k := p.state.headerLen - m
if n < k {
k = n
}
2020-05-19 04:36:20 +00:00
p.consumed += k
2020-05-18 08:43:06 +00:00
c := b[0:k]
b = b[k:]
2020-05-19 04:36:20 +00:00
//fmt.Println("[debug] has", m, "want", k, "more and have", len(b), "more")
2020-05-18 08:43:06 +00:00
p.state.header = append(p.state.header, c...)
if p.state.headerLen != len(p.state.header) {
return b, nil
}
parts := strings.Split(string(p.state.header), ",")
p.state.header = nil
if len(parts) < 5 {
return nil, errors.New("error unpacking header")
}
payloadLenStr := parts[LengthIndex]
payloadLen, err := strconv.Atoi(payloadLenStr)
if nil != err {
return nil, errors.New("error unpacking header payload length")
}
p.state.payloadLen = payloadLen
port, _ := strconv.Atoi(parts[PortIndex])
service := parts[ServiceIndex]
if "control" == service {
return nil, errors.New("'control' messages not implemented")
}
2020-05-21 10:29:05 +00:00
src := Addr{
2020-05-18 08:43:06 +00:00
family: parts[FamilyIndex],
addr: parts[AddressIndex],
port: port,
2020-05-21 10:29:05 +00:00
//scheme: Scheme(service),
}
dst := Addr{
2020-05-18 08:43:06 +00:00
scheme: Scheme(service),
}
2020-05-21 10:29:05 +00:00
if len(parts) > RelayPortIndex {
port, _ := strconv.Atoi(parts[RelayPortIndex])
dst.port = port
}
if len(parts) > ServernameIndex {
dst.addr = parts[ServernameIndex]
}
p.state.srcAddr = src
p.state.dstAddr = dst
2020-05-18 08:43:06 +00:00
/*
p.state.conn = p.conns[addr.Network()]
if nil == p.state.conn {
rconn, wconn := net.Pipe()
conn := Conn{
updated: time.Now(),
2020-05-22 10:07:35 +00:00
relayTargetAddr: addr,
2020-05-18 08:43:06 +00:00
relay: rconn,
local: wconn,
}
copied := conn
p.state.conn = &copied
p.conns[addr.Network()] = p.state.conn
p.newConns <- p.state.conn
}
*/
2020-05-19 07:06:10 +00:00
p.parseState++
2020-05-18 08:43:06 +00:00
return b, nil
}
func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
// Handle "connect" and "end"
if 0 == p.state.payloadLen {
/*
p.newMsg <- msg{
addr: Addr,
bytes: []byte{},
}
2020-05-22 10:07:35 +00:00
addr := &p.state.conn.relayTargetAddr
2020-05-18 08:43:06 +00:00
if "end" == string(addr.scheme) {
if err := p.state.conn.Close(); nil != err {
// TODO log potential error?
}
}
return b, nil
*/
2020-05-19 04:36:20 +00:00
//fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen)
2020-05-21 10:29:05 +00:00
p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{})
2020-05-18 08:43:06 +00:00
return b, nil
}
k := p.state.payloadLen - p.state.payloadWritten
2020-05-19 04:36:20 +00:00
if n < k {
2020-05-18 08:43:06 +00:00
k = n
}
c := b[0:k]
b = b[k:]
// TODO don't let a write on one connection block others,
// and also put backpressure on just that connection
/*
m, err := p.state.conn.local.Write(c)
p.state.payloadWritten += m
if nil != err {
// TODO we want to surface this error somewhere, but not to the websocket
return b, nil
}
*/
2020-05-21 10:29:05 +00:00
p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, c)
2020-05-19 04:36:20 +00:00
p.consumed += k
2020-05-18 08:43:06 +00:00
p.state.payloadWritten += k
2020-05-19 04:36:20 +00:00
//fmt.Printf("[debug] [1] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen)
2020-05-18 08:43:06 +00:00
// if this payload is complete, reset all state
if p.state.payloadWritten == p.state.payloadLen {
p.state = ParserState{}
2020-05-19 04:36:20 +00:00
p.parseState = 0
2020-05-18 08:43:06 +00:00
}
return b, nil
}