Compare commits

..

1 commit

Author SHA1 Message Date
bemasher
c2fca58625 Switch from rtl_tcp to direct access. 2015-11-26 23:06:46 -07:00
2 changed files with 102 additions and 138 deletions

View file

@ -38,7 +38,7 @@ var logFile *os.File
var sampleFilename = flag.String("samplefile", os.DevNull, "raw signal dump file")
var sampleFile *os.File
var msgType = flag.String("msgtype", "scm", "message type to receive: scm, idm, scm+idm or r900")
var msgType = flag.String("msgtype", "scm", "message type to receive: scm, idm or r900")
var symbolLength = flag.Int("symbollength", 72, "symbol length in samples")

238
recv.go
View file

@ -24,24 +24,21 @@ import (
"log"
"os"
"os/signal"
"runtime/pprof"
"strings"
"sync"
"time"
"github.com/bemasher/rtlamr/idm"
"github.com/bemasher/rtlamr/parse"
"github.com/bemasher/rtlamr/r900"
"github.com/bemasher/rtlamr/scm"
"github.com/bemasher/rtltcp"
"github.com/jpoirier/gortlsdr"
)
var rcvr Receiver
type Receiver struct {
rtltcp.SDR
*rtlsdr.Context
p parse.Parser
q parse.Parser
fc parse.FilterChain
}
@ -51,9 +48,6 @@ func (rcvr *Receiver) NewReceiver() {
rcvr.p = scm.NewParser(*symbolLength, *decimation)
case "idm":
rcvr.p = idm.NewParser(*symbolLength, *decimation)
case "scm+idm":
rcvr.p = idm.NewParser(*symbolLength, *decimation)
rcvr.q = scm.NewParser(*symbolLength, *decimation)
case "r900":
rcvr.p = r900.NewParser(*symbolLength, *decimation)
default:
@ -62,34 +56,16 @@ func (rcvr *Receiver) NewReceiver() {
if !*quiet {
rcvr.p.Log()
if(rcvr.q != nil) {
rcvr.q.Log()
}
}
// Connect to rtl_tcp server.
if err := rcvr.Connect(nil); err != nil {
// Open rtl-sdr dongle.
var err error
if rcvr.Context, err = rtlsdr.Open(0); err != nil {
log.Fatal(err)
}
rcvr.HandleFlags()
// Tell the user how many gain settings were reported by rtl_tcp.
if !*quiet {
log.Println("GainCount:", rcvr.SDR.Info.GainCount)
}
centerfreqFlagSet := false
sampleRateFlagSet := false
gainFlagSet := false
flag.Visit(func(f *flag.Flag) {
switch f.Name {
case "centerfreq":
centerfreqFlagSet = true
case "samplerate":
sampleRateFlagSet = true
case "gainbyindex", "tunergainmode", "tunergain", "agcmode":
gainFlagSet = true
case "unique":
rcvr.fc.Add(NewUniqueFilter())
case "filterid":
@ -99,139 +75,126 @@ func (rcvr *Receiver) NewReceiver() {
}
})
// Set some parameters for listening.
if centerfreqFlagSet {
rcvr.SetCenterFreq(uint32(rcvr.Flags.CenterFreq))
} else {
rcvr.SetCenterFreq(rcvr.p.Cfg().CenterFreq)
if err := rcvr.SetCenterFreq(int(rcvr.p.Cfg().CenterFreq)); err != nil {
log.Fatal(err)
}
if err := rcvr.SetSampleRate(int(rcvr.p.Cfg().SampleRate)); err != nil {
log.Fatal(err)
}
if err := rcvr.SetTunerGainMode(false); err != nil {
log.Fatal(err)
}
if !sampleRateFlagSet {
rcvr.SetSampleRate(uint32(rcvr.p.Cfg().SampleRate))
}
if !gainFlagSet {
rcvr.SetGainMode(true)
}
log.Println(rcvr.GetCenterFreq())
log.Println(rcvr.GetSampleRate())
rcvr.ResetBuffer()
return
}
func (rcvr *Receiver) Run() {
in, out := io.Pipe()
in2, out2 := io.Pipe()
// Setup signal channel for interruption.
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Kill, os.Interrupt)
go func() {
tcpBlock := make([]byte, 16384)
for {
n, err := rcvr.Read(tcpBlock)
if err != nil {
return
}
out.Write(tcpBlock[:n])
if(rcvr.q != nil) {
out2.Write(tcpBlock[:n])
}
}
// Setup time limit channel
tLimit := make(<-chan time.Time, 1)
if *timeLimit != 0 {
tLimit = time.After(*timeLimit)
}
in, out := io.Pipe()
userCtx := rtlsdr.UserCtx(out)
defer func() {
in.Close()
out.Close()
}()
var wg sync.WaitGroup
ctx := &rtlsdr.CustUserCtx{
func(buf []byte, ctx *rtlsdr.UserCtx) {
out := (*ctx).(*io.PipeWriter)
out.Write(buf)
},
&userCtx,
}
go rcvr.ReadAsync2(ctx, 1, 16384)
block := make([]byte, rcvr.p.Cfg().BlockSize2)
start := time.Now()
looper := func(p parse.Parser, fc parse.FilterChain, in io.Reader) {
// Setup signal channel for interruption.
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Kill, os.Interrupt)
for {
// Exit on interrupt or time limit, otherwise receive.
select {
case <-sigint:
return
case <-tLimit:
fmt.Println("Time Limit Reached:", time.Since(start))
return
default:
// Read new sample block.
_, err := io.ReadFull(in, block)
if err != nil {
log.Fatal("Error reading samples: ", err)
}
// Setup time limit channel
tLimit := make(<-chan time.Time, 1)
if *timeLimit != 0 {
tLimit = time.After(*timeLimit)
}
pktFound := false
indices := rcvr.p.Dec().Decode(block)
block := make([]byte, p.Cfg().BlockSize2)
for {
// Exit on interrupt or time limit, otherwise receive.
select {
case <-sigint:
return
case <-tLimit:
fmt.Println("Time Limit Reached:", time.Since(start))
return
default:
// Read new sample block.
_, err := io.ReadFull(in, block)
for _, pkt := range rcvr.p.Parse(indices) {
if !rcvr.fc.Match(pkt) {
continue
}
var msg parse.LogMessage
msg.Time = time.Now()
msg.Offset, _ = sampleFile.Seek(0, os.SEEK_CUR)
msg.Length = rcvr.p.Cfg().BufferLength << 1
msg.Message = pkt
err = encoder.Encode(msg)
if err != nil {
log.Fatal("Error reading samples: ", err)
log.Fatal("Error encoding message: ", err)
}
pktFound := false
indices := p.Dec().Decode(block)
// The XML encoder doesn't write new lines after each
// element, add them.
if _, ok := encoder.(*xml.Encoder); ok {
fmt.Fprintln(logFile)
}
for _, pkt := range p.Parse(indices) {
if !fc.Match(pkt) {
continue
pktFound = true
if *single {
if len(meterID.UintMap) == 0 {
break
} else {
delete(meterID.UintMap, uint(pkt.MeterID()))
}
}
}
var msg parse.LogMessage
msg.Time = time.Now()
msg.Offset, _ = sampleFile.Seek(0, os.SEEK_CUR)
msg.Length = p.Cfg().BufferLength << 1
msg.Message = pkt
err = encoder.Encode(msg)
if pktFound {
if *sampleFilename != os.DevNull {
_, err = sampleFile.Write(rcvr.p.Dec().IQ)
if err != nil {
log.Fatal("Error encoding message: ", err)
}
// The XML encoder doesn't write new lines after each
// element, add them.
if _, ok := encoder.(*xml.Encoder); ok {
fmt.Fprintln(logFile)
}
pktFound = true
if *single {
if len(meterID.UintMap) == 0 {
break
} else {
delete(meterID.UintMap, uint(pkt.MeterID()))
}
log.Fatal("Error writing raw samples to file:", err)
}
}
if pktFound {
if *sampleFilename != os.DevNull {
_, err = sampleFile.Write(p.Dec().IQ)
if err != nil {
log.Fatal("Error writing raw samples to file:", err)
}
}
if *single && len(meterID.UintMap) == 0 {
return
}
if *single && len(meterID.UintMap) == 0 {
return
}
}
}
}
wg.Add(1);
go looper(rcvr.p, rcvr.fc, in);
if(rcvr.q != nil) {
wg.Add(1);
go looper(rcvr.q, rcvr.fc, in2);
}
wg.Wait();
}
func init() {
log.SetFlags(log.Lshortfile | log.Lmicroseconds)
}
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file")
func main() {
rcvr.RegisterFlags()
RegisterFlags()
flag.Parse()
@ -239,18 +202,19 @@ func main() {
rcvr.NewReceiver()
defer logFile.Close()
defer sampleFile.Close()
defer rcvr.Close()
defer func() {
logFile.Close()
sampleFile.Close()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
fmt.Println("Cancelling...")
err := rcvr.CancelAsync()
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
fmt.Println("Closing...")
rcvr.Close()
os.Exit(0)
}()
rcvr.Run()
}