Browse Source

Fixed the buffer size in ReadUDP, moved some code to seperate methods.

master
Martins Eglitis 6 months ago
parent
commit
7240de804a
16 changed files with 201 additions and 177 deletions
  1. +1
    -0
      .gitignore
  2. +45
    -62
      aggregate/aggregate.go
  3. +2
    -16
      client/main.go
  4. +12
    -7
      helpers/helpers.go
  5. +28
    -15
      main.go
  6. +12
    -17
      periodic/periodic.go
  7. +0
    -0
      profiles/.gitkeep
  8. +9
    -3
      scripts/build1.sh
  9. +9
    -3
      scripts/build2.sh
  10. +9
    -3
      scripts/build3.sh
  11. +1
    -2
      scripts/tar.sh
  12. +2
    -2
      scripts/test_1_ring.sh
  13. +2
    -2
      scripts/test_2_ring.sh
  14. +26
    -17
      send/send.go
  15. +30
    -17
      serve/serve.go
  16. +13
    -11
      web/web.go

+ 1
- 0
.gitignore View File

@@ -8,3 +8,4 @@ client/main
*.bin
_Downloads/*
_SharedFiles/*
profiles

+ 45
- 62
aggregate/aggregate.go View File

@@ -12,7 +12,7 @@ import (
"github.com/sitilge/Peerster/structs"
)

func Packet(client bool, conn *net.UDPConn, size int, buff []byte, socketIn structs.Socket, socketOut structs.Socket, name string, simple bool) {
func Packet(client bool, conn *net.UDPConn, size int, buff []byte, socketLocal structs.Socket, socketRemote structs.Socket, name string, simple bool) {
if client {
//only Message structs by clients
message := structs.Message{}
@@ -24,9 +24,9 @@ func Packet(client bool, conn *net.UDPConn, size int, buff []byte, socketIn stru
}

if simple {
PacketClientSimple(conn, socketIn, socketOut, message, name, simple)
PacketClientSimple(conn, socketRemote, message, name)
} else {
PacketClientRumor(conn, socketIn, socketOut, message, name, simple)
PacketClientRumor(conn, socketLocal, message, name)
}
} else {
//only GossipPackets structs by peers
@@ -39,60 +39,52 @@ func Packet(client bool, conn *net.UDPConn, size int, buff []byte, socketIn stru
}

if packet.Simple != nil {
PacketGossipSimple(conn, socketIn, socketOut, packet, name, simple)
PacketGossipSimple(conn, socketRemote, packet)
} else if packet.Rumor != nil {
PacketGossipRumor(conn, socketIn, socketOut, packet, name, simple)
PacketGossipRumor(conn, socketLocal, packet)
} else if packet.Status != nil {
PacketGossipStatus(conn, socketIn, socketOut, packet, name, simple)
PacketGossipStatus(conn, socketLocal, packet)
}
}
}

func PacketClientRumor(conn *net.UDPConn, socketIn structs.Socket, socketOut structs.Socket, message structs.Message, name string, simple bool) {
func PacketClientRumor(conn *net.UDPConn, socket structs.Socket, message structs.Message, name string) {
fmt.Printf("CLIENT MESSAGE %s \n", message.Text)
helpers.PrintPeers()

structs.LocalID++

packetOut := structs.GossipPacket{}
packetOut.Rumor = &structs.RumorMessage{
packet := structs.GossipPacket{}
packet.Rumor = &structs.RumorMessage{
Origin: name,
ID: structs.LocalID,
Text: message.Text,
}

structs.Rumours = append(structs.Rumours, *packetOut.Rumor)
structs.Rumours = append(structs.Rumours, *packet.Rumor)

//send to random
peer := helpers.RandomPeer(socketIn)

fmt.Printf("MONGERING with %s:%s \n", peer.IP, peer.Port)

helpers.AddStatus(packetOut)
peer := helpers.RandomPeer(socket)

send.Gossip(conn, peer, packetOut)
send.Gossip(conn, peer, packet)
}

func PacketGossipRumor(conn *net.UDPConn, socketIn structs.Socket, socketOut structs.Socket, packetIn structs.GossipPacket, name string, simple bool) {
fmt.Printf("RUMOR origin %s from %s ID %v contents %s\n", packetIn.Rumor.Origin, fmt.Sprintf("%s:%s", socketIn.IP, socketIn.Port), packetIn.Rumor.ID, packetIn.Rumor.Text)
func PacketGossipRumor(conn *net.UDPConn, socket structs.Socket, packet structs.GossipPacket) {
fmt.Printf("RUMOR origin %s from %s ID %v contents %s\n", packet.Rumor.Origin, fmt.Sprintf("%s:%s", socket.IP, socket.Port), packet.Rumor.ID, packet.Rumor.Text)

old := helpers.RumourExists(*packetIn.Rumor)
old := helpers.RumourExists(*packet.Rumor)

//always ACK, even if the message is not new
send.Status(conn, socketIn)
send.Status(conn, socket)

if !old {
//forwarding the packet
packetOut := packetIn
packetOut := packet

structs.Rumours = append(structs.Rumours, *packetOut.Rumor)

//send to random
peer := helpers.RandomPeer(socketIn)

fmt.Printf("MONGERING with %s:%s \n", peer.IP, peer.Port)

helpers.AddStatus(packetOut)
peer := helpers.RandomPeer(socket)

send.Gossip(conn, peer, packetOut)
}
@@ -100,8 +92,8 @@ func PacketGossipRumor(conn *net.UDPConn, socketIn structs.Socket, socketOut str
helpers.PrintPeers()
}

func PacketGossipStatus(conn *net.UDPConn, socketIn structs.Socket, socketOut structs.Socket, packet structs.GossipPacket, name string, simple bool) {
fmt.Printf("STATUS from %s:%s", socketIn.IP, socketIn.Port)
func PacketGossipStatus(conn *net.UDPConn, socket structs.Socket, packet structs.GossipPacket) {
fmt.Printf("STATUS from %s:%s", socket.IP, socket.Port)
for _, want := range packet.Status.Want {
fmt.Printf(" peer %s nextID %v", want.Identifier, want.NextID)
}
@@ -110,20 +102,18 @@ func PacketGossipStatus(conn *net.UDPConn, socketIn structs.Socket, socketOut st
//TODO - send out just ONE message, either Rumor or Status, depending on flag type (0 = rumour, 1 = status)
packetOut := structs.GossipPacket{}
rumourOut := structs.RumorMessage{}
socket := structs.Socket{}
socketRemote := structs.Socket{}
flag := 0

//looping through all statuses
for _, statusIn := range packet.Status.Want {
//ack status packet
helpers.AckStatus(statusIn)
for _, status := range packet.Status.Want {
helpers.AckStatus(status)

//TODO - localID is the largest ID, where rumor.Origin == statusIn.Identifier
localID := uint32(0)

//TODO - search for the largest ID
for _, rumour := range structs.Rumours {
if rumour.Origin == statusIn.Identifier {
if rumour.Origin == status.Identifier {
if rumour.ID > localID {
//TODO - just saving, in case I will be sending Rumour
rumourOut = rumour
@@ -138,18 +128,17 @@ func PacketGossipStatus(conn *net.UDPConn, socketIn structs.Socket, socketOut st
//fmt.Printf("%s: localNextID = %v, NextID = %v \n", statusIn.Identifier, localID, statusIn.NextID)

//TODO - I am sender!
if localID > statusIn.NextID {
if localID > status.NextID {
//I have new messages
//fmt.Printf("New messages in receiver %v \n", name)
//fmt.Println(structs.Rumours)

flag = 1

socket = socketIn
} else if localID < statusIn.NextID {
socketRemote = socket
} else if localID < status.NextID {
//You have new messages
//if name == statusIn.Identifier {
// fmt.Printf("I have more messages than %v \n", socketIn.Port)
// fmt.Printf("I have more messages than %v \n", socketLocal.Port)
//} else {
// fmt.Printf("New messages in %v \n", statusIn.Identifier)
//}
@@ -157,10 +146,10 @@ func PacketGossipStatus(conn *net.UDPConn, socketIn structs.Socket, socketOut st

flag = 2

socket = socketIn
socketRemote = socket
} else {
//We both have the same messages
fmt.Printf("IN SYNC WITH %s:%s \n", socketIn.IP, socketIn.Port)
fmt.Printf("IN SYNC WITH %s:%s \n", socket.IP, socket.Port)

r := rand.Int() % 2

@@ -168,39 +157,35 @@ func PacketGossipStatus(conn *net.UDPConn, socketIn structs.Socket, socketOut st
flag = 1

//send to random
socket = helpers.RandomPeer(socketIn)
socketRemote = helpers.RandomPeer(socket)

fmt.Printf("FLIPPED COIN sending rumor to %s:%s \n", socket.IP, socket.Port)
} else {
fmt.Printf("FLIPPED COIN unlucky \n")
fmt.Printf("FLIPPED COIN sending rumor to %s:%s \n", socketRemote.IP, socketRemote.Port)
}
}
}

if flag == 1 {
packetOut.Rumor = &rumourOut
fmt.Printf("MONGERING with %s:%s \n", socket.IP, socket.Port)

helpers.AddStatus(packetOut)

//send to the same peer
send.Gossip(conn, socket, packetOut)
send.Gossip(conn, socketRemote, packetOut)
} else if flag == 2 {
send.Status(conn, socket)
send.Status(conn, socketRemote)
}

helpers.PrintPeers()
}

func PacketGossipSimple(conn *net.UDPConn, socketIn structs.Socket, socketOut structs.Socket, packet structs.GossipPacket, name string, simple bool) {
func PacketGossipSimple(conn *net.UDPConn, socketRemote structs.Socket, packet structs.GossipPacket) {
fmt.Printf("SIMPLE MESSAGE origin %s from %s contents %s \n", packet.Simple.OriginalName, packet.Simple.RelayPeerAddr, packet.Simple.Contents)
helpers.PrintPeers()

//add relay peer address to peers
relay, err := helpers.ParseSocket(packet.Simple.RelayPeerAddr)

if err != nil {
fmt.Printf("unable to parse socket: %s", err.Error())
fmt.Println(err.Error())

return
}

//specifically add RELAY
@@ -209,13 +194,11 @@ func PacketGossipSimple(conn *net.UDPConn, socketIn structs.Socket, socketOut st
packetOut := structs.GossipPacket{}
packetOut.Simple = &structs.SimpleMessage{
OriginalName: packet.Simple.OriginalName,
RelayPeerAddr: fmt.Sprintf("%s:%s", socketOut.IP, socketOut.Port),
RelayPeerAddr: fmt.Sprintf("%s:%s", socketRemote.IP, socketRemote.Port),
Contents: packet.Simple.Contents,
}

//send to everyone
for _, peer := range structs.Peers {
//except relay itself
if peer.IP == relay.IP && peer.Port == relay.Port {
continue
}
@@ -224,19 +207,19 @@ func PacketGossipSimple(conn *net.UDPConn, socketIn structs.Socket, socketOut st
}
}

func PacketClientSimple(conn *net.UDPConn, socketIn structs.Socket, socketOut structs.Socket, message structs.Message, name string, simple bool) {
func PacketClientSimple(conn *net.UDPConn, socket structs.Socket, message structs.Message, name string) {
fmt.Printf("CLIENT MESSAGE %s \n", message.Text)
helpers.PrintPeers()

packetOut := structs.GossipPacket{}
packetOut.Simple = &structs.SimpleMessage{
packet := structs.GossipPacket{}
packet.Simple = &structs.SimpleMessage{
OriginalName: name,
RelayPeerAddr: fmt.Sprintf("%s:%s", socketOut.IP, socketOut.Port),
RelayPeerAddr: fmt.Sprintf("%s:%s", socket.IP, socket.Port),
Contents: message.Text,
}

//send to everyone
for _, peer := range structs.Peers {
send.Gossip(conn, peer, packetOut)
send.Gossip(conn, peer, packet)
}
}
}

+ 2
- 16
client/main.go View File

@@ -2,11 +2,8 @@ package main

import (
"flag"
"fmt"
"log"
"net"

"github.com/sitilge/Peerster/send"
"github.com/sitilge/Peerster/serve"
"github.com/sitilge/Peerster/structs"
)

@@ -17,18 +14,7 @@ func main() {

flag.Parse()

//TODO - this should most probably must be moved to another package together with main app UDP stuff
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(*clientIP+":"+*clientPort))

if err != nil {
log.Fatalf("failed to resolve UDP socket: %s", err.Error())
}

conn, err := net.DialUDP("udp", nil, raddr)

if err != nil {
log.Fatalf("failed to dial UDP socket: %s", err.Error())
}
conn := serve.DialUDP(*clientIP, *clientPort)

message := structs.Message{
Text: *clientMessage,


+ 12
- 7
helpers/helpers.go View File

@@ -9,12 +9,15 @@ import (
)

func AddStatus(packet structs.GossipPacket) {
structs.Status = append(structs.Status, structs.StatusBuffer{
Identifier: packet.Rumor.Origin,
Index: packet.Rumor.ID,
Acknowledged: false,
Packet: packet,
})
//TODO - check if we don't send status gossip, which does not need to be ACKed
if packet.Rumor != nil {
structs.Status = append(structs.Status, structs.StatusBuffer{
Identifier: packet.Rumor.Origin,
Index: packet.Rumor.ID,
Acknowledged: false,
Packet: packet,
})
}
}

func AckStatus(status structs.PeerStatus) {
@@ -56,7 +59,9 @@ func ParsePeers(peers string) {
peer, err := ParseSocket(peersSegregated[i])

if err != nil {
fmt.Printf("unable to parse socket: %s", err)
fmt.Println(err)

return
}

//add to peers


+ 28
- 15
main.go View File

@@ -14,28 +14,48 @@ import (
func main() {
clientIP := flag.String("UIIP", "127.0.0.1", "Client IP")
clientPort := flag.String("UIPort", "8080", "Client port")
clientMode := flag.Bool("web", false, "Client web mode")
gossipSocket := flag.String("gossipAddr", "127.0.0.1:5000", "Peerster socket (ip:port)")
gossipName := flag.String("name", "Alpha", "Peerster name")
gossipPeers := flag.String("peers", "", "Peerster peers (ip1:port1,ip2:port2,...)")
gossipSimple := flag.Bool("simple", false, "Peerster simple mode")
gossipPeriodStatus := flag.Int("statusPeriod", 10, "Peerster status period")
gossipPeriodEntropy := flag.Int("antiEntropy", 10, "Peerster entropy period")
gossipPeriodStatus := flag.Int("statusPeriod", 1, "Peerster status period")
gossipPeriodEntropy := flag.Int("antiEntropy", 1, "Peerster entropy period")
profileCPU := flag.Bool("profile", true, "Profile CPU")
//profileMemory := flag.Bool("profileMemory", true, "Profile memory")

flag.Parse()

if *profileCPU {
//handle := profiling.Start(profiling.MemProfile)
//
//tickerProfile := time.NewTicker(time.Duration(30) * time.Second)
//
//go func() {
// for {
// select {
// case <-tickerProfile.C:
// //fmt.Printf("Profile %v %v \n", time.Now().Minute(), time.Now().Second())
//
// handle.Stop()
//
// os.Exit(0)
// }
// }
//}()
}

//parse the peers
helpers.ParsePeers(*gossipPeers)

socketGossip, err := helpers.ParseSocket(*gossipSocket)

if err != nil {
fmt.Printf("unable to parse socket: %s", err)
fmt.Println(err)
}

//create two connections, I need both for client + gossip conn for client, gossip conn for gossip
connClient := serve.ListenUDP(*clientIP, *clientPort)
connGossip := serve.ListenUDP(socketGossip.IP, socketGossip.Port)
connClient := serve.ListenUDP(*clientIP, *clientPort)
connWeb := serve.ConnectTCP(*clientIP, *clientPort, *gossipName, connGossip)

//Set a non-constant seed
rand.Seed(time.Now().UnixNano())
@@ -43,17 +63,10 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)

go periodic.Pulse(connGossip, *gossipName, *gossipPeriodStatus, *gossipPeriodEntropy)

go periodic.Pulse(connGossip, *gossipPeriodStatus, *gossipPeriodEntropy, *profileCPU)
go serve.ConnectUDP(true, connClient, connGossip, socketGossip, *gossipName, *gossipSimple)

go serve.ConnectUDP(false, connClient, connGossip, socketGossip, *gossipName, *gossipSimple)

if *clientMode {
connWeb := serve.ConnectTCP(*clientIP, *clientPort, *gossipName, connGossip)

go serve.ListenTCP(connWeb)
}
go serve.ListenTCP(connWeb)

wg.Wait()
}

+ 12
- 17
periodic/periodic.go View File

@@ -1,7 +1,6 @@
package periodic

import (
"fmt"
"net"
"time"

@@ -10,20 +9,16 @@ import (
"github.com/sitilge/Peerster/structs"
)

func Pulse(conn *net.UDPConn, name string, periodStatus int, periodEntropy int) {
stopStatus := make(chan int)
stopEntropy := make(chan int)

var tickerStatus, tickerEntropy *time.Ticker

tickerStatus = time.NewTicker(time.Duration(periodStatus) * time.Second)
tickerEntropy = time.NewTicker(time.Duration(periodEntropy) * time.Second)
func Pulse(conn *net.UDPConn, periodStatus int, periodEntropy int, profileCPU bool) {
tickerStatus := time.NewTicker(time.Duration(periodStatus) * time.Second)
tickerEntropy := time.NewTicker(time.Duration(periodEntropy) * time.Second)
//tickerProfile := time.NewTicker(time.Duration(30) * time.Second)

go func() {
for {
select {
case <-tickerStatus.C:
fmt.Printf("Status %v %v \n", time.Now().Minute(), time.Now().Second())
//fmt.Printf("Status %v %v \n", time.Now().Minute(), time.Now().Second())

//TODO - because I start with the latest Rumour message that was appended to the Status buffer
for i := len(structs.Rumours) - 1; i >= 0; i-- {
@@ -33,8 +28,6 @@ func Pulse(conn *net.UDPConn, name string, periodStatus int, periodEntropy int)
if rumour.Origin == status.Identifier && status.Acknowledged == false {
peer := helpers.RandomPeer(structs.Socket{})

fmt.Printf("MONGERING with %s:%s \n", peer.IP, peer.Port)

packetOut := structs.GossipPacket{}
packetOut.Rumor = &rumour

@@ -54,14 +47,16 @@ func Pulse(conn *net.UDPConn, name string, periodStatus int, periodEntropy int)
//send to random
peer := helpers.RandomPeer(structs.Socket{})

//fmt.Println("entropy, sending status to ", peer)
go send.Status(conn, peer)

case <-stopStatus:
tickerStatus.Stop()
//case <-tickerProfile.C:
//fmt.Printf("Profile %v %v \n", time.Now().Minute(), time.Now().Second())

case <-stopEntropy:
tickerEntropy.Stop()
//if profile != "" {
// fmt.Println("Profiling time has run out!")
//
// os.Exit(0)
//}
}
}
}()


+ 0
- 0
profiles/.gitkeep View File


+ 9
- 3
scripts/build1.sh View File

@@ -4,11 +4,17 @@ clear

cd ./../

NAME=Alpha
PROF_CPU=cpu_${NAME}_`date +\%Y-\%m-\%d_\%H:\%M:\%S`.prof

go build main.go && ./main \
--UIIP="127.0.0.1" \
--UIPort="8080" \
--gossipAddr="127.0.0.1:5000" \
--name="Alpha" \
--name="${NAME}" \
--peers="127.0.0.1:5001" \
--web="true" \
--simple="false"
--simple="false" \
--profile
#--profile="profiles/${PROF_CPU}"

#ln -fs `pwd`/profiles/${PROF_CPU} `pwd`/profiles/cpu_${NAME}_latest.prof

+ 9
- 3
scripts/build2.sh View File

@@ -4,11 +4,17 @@ clear

cd ./../

NAME=Bravo
PROF_CPU=cpu_${NAME}_`date +\%Y-\%m-\%d_\%H:\%M:\%S`.prof

go build main.go && ./main \
--UIIP="127.0.0.1" \
--UIPort="8081" \
--gossipAddr="127.0.0.1:5001" \
--name="Bravo" \
--name="${NAME}" \
--peers="127.0.0.1:5002" \
--web="true" \
--simple="false"
--simple="false" \
--profile
#--profile="profiles/${PROF_CPU}"

#ln -fs `pwd`/profiles/${PROF_CPU} `pwd`/profiles/cpu_${NAME}_latest.prof

+ 9
- 3
scripts/build3.sh View File

@@ -4,11 +4,17 @@ clear

cd ./../

NAME=Charlie
PROF_CPU=cpu_${NAME}_`date +\%Y-\%m-\%d_\%H:\%M:\%S`.prof

go build main.go && ./main \
--UIIP="127.0.0.1" \
--UIPort="8082" \
--gossipAddr="127.0.0.1:5002" \
--name="Charlie" \
--name="${NAME}" \
--peers="127.0.0.1:5000" \
--web="true" \
--simple="false"
--simple="false" \
--profile
#--profile="profiles/${PROF_CPU}"

#ln -fs `pwd`/profiles/${PROF_CPU} `pwd`/profiles/cpu_${NAME}_latest.prof

+ 1
- 2
scripts/tar.sh View File

@@ -12,8 +12,7 @@ tar \
--exclude=${peersterdir}/README.md \
--exclude=${peersterdir}/client/client \
--exclude=${peersterdir}/client/main \
--exclude=${peersterdir}/_Downloads/* \
--exclude=${peersterdir}/_SharedFiles/* \
--exclude=${peersterdir}/profiles/* \
--exclude=${peersterdir}/docs \
--exclude=${peersterdir}/scripts \
--exclude=${peersterdir}/Peerster \


+ 2
- 2
scripts/test_1_ring.sh View File

@@ -29,7 +29,7 @@ do
peerPort=$((($gossipPort+1)%10+5000))
peer="127.0.0.1:$peerPort"
gossipAddr="127.0.0.1:$gossipPort"
./peerster -UIPort=$UIPort -gossipAddr=$gossipAddr -name=$name -simple -peers=$peer > $outFileName &
./Peerster -UIPort=$UIPort -gossipAddr=$gossipAddr -name=$name -simple -peers=$peer > $outFileName &
outputFiles+=("$outFileName")
if [[ "$DEBUG" == "true" ]] ; then
echo "$name running at UIPort $UIPort and gossipPort $gossipPort"
@@ -42,7 +42,7 @@ done
./client/client -UIPort=12349 -msg=$message
./client/client -UIPort=12346 -msg=$message2
sleep 3
pkill -f peerster
pkill -f Peerster

#testing
failed="F"


+ 2
- 2
scripts/test_2_ring.sh View File

@@ -32,7 +32,7 @@ do
peerPort=$((($gossipPort+1)%10+5000))
peer="127.0.0.1:$peerPort"
gossipAddr="127.0.0.1:$gossipPort"
./peerster -UIPort=$UIPort -gossipAddr=$gossipAddr -name=$name -peers=$peer > $outFileName &
./Peerster -UIPort=$UIPort -gossipAddr=$gossipAddr -name=$name -peers=$peer > $outFileName &
outputFiles+=("$outFileName")
if [[ "$DEBUG" == "true" ]] ; then
echo "$name running at UIPort $UIPort and gossipPort $gossipPort"
@@ -51,7 +51,7 @@ sleep 1
./client/client -UIPort=12351 -msg=$message_c3

sleep 10
pkill -f peerster
pkill -f Peerster

#testing
failed="F"


+ 26
- 17
send/send.go View File

@@ -2,7 +2,7 @@ package send

import (
"fmt"
"log"
"github.com/sitilge/Peerster/helpers"
"net"

"github.com/dedis/protobuf"
@@ -10,26 +10,35 @@ import (
)

func Gossip(conn *net.UDPConn, peer structs.Socket, packet structs.GossipPacket) {
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", peer.IP, peer.Port))
if peer.IP == "" || peer.Port == "" {
return
}

buff, err := protobuf.Encode(&packet)

if err != nil {
log.Fatalf("failed to resolve UDP socket: %s", err.Error())
}
fmt.Println(err)

if peer.IP == "" && peer.Port == "" {
return
}

buff, err := protobuf.Encode(&packet)
fmt.Printf("MONGERING with %s:%s \n", peer.IP, peer.Port)
helpers.PrintPeers()

helpers.AddStatus(packet)

raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", peer.IP, peer.Port))

if err != nil {
log.Fatal(err)
fmt.Println(err.Error())

return
}

_, err = conn.WriteTo(buff, raddr)

if err != nil {
fmt.Printf("failed to send UDP datagram: %s", err.Error())
fmt.Println(err.Error())
}
}

@@ -37,17 +46,23 @@ func Message(conn *net.UDPConn, message structs.Message) {
buff, err := protobuf.Encode(&message)

if err != nil {
log.Fatal(err)
fmt.Println(err)

return
}

_, err = conn.Write(buff)

if err != nil {
fmt.Printf("failed to send UDP datagram: %s", err)
fmt.Println(err.Error())
}
}

func Status(conn *net.UDPConn, socket structs.Socket) {
func Status(conn *net.UDPConn, peer structs.Socket) {
if peer.IP == "" || peer.Port == "" {
return
}

packet := structs.GossipPacket{
Status: &structs.StatusPacket{
Want: []structs.PeerStatus{},
@@ -75,11 +90,5 @@ func Status(conn *net.UDPConn, socket structs.Socket) {
})
}

peer := structs.Socket{
IP: socket.IP,
Port: socket.Port,
}

//fmt.Println("Sending STATUS to ", peer)
Gossip(conn, peer, packet)
}

+ 30
- 17
serve/serve.go View File

@@ -14,30 +14,41 @@ import (
"github.com/sitilge/Peerster/web"
)

func ListenUDP(listenIP string, listenPort string) *net.UDPConn {
laddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", listenIP, listenPort))
func ListenUDP(IP string, port string) *net.UDPConn {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", IP, port))

if err != nil {
log.Fatalf("failed to resolve UDP socket: %s", err.Error())
log.Fatalln(err.Error())
}

conn, err := net.ListenUDP("udp", laddr)
conn, err := net.ListenUDP("udp", addr)

if err != nil {
log.Fatalf("unable to listen to UDP socket: %s", err.Error())
log.Fatalln(err.Error())
}

return conn
}

func ConnectUDP(client bool, connClient *net.UDPConn, connGossip *net.UDPConn, socketOut structs.Socket, name string, simple bool) {
//TODO - check WHY and WHERE the memory allocated to buff gets changed, so I can't use it outside the loop
for {
//Remember that buffer must be big enough to hold the actual data PLUS the bytes for the struct itself
//eg. if DataReply.Data is 8 * 1024 bytes, then the buffer must be 8 * 1024 + struct structure bytes
//in this case, I am super generous, giving extra 1 * 1024 bytes
func DialUDP(IP string, port string) *net.UDPConn {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", IP, port))
if err != nil {
log.Fatalln(err.Error())
}

buff := make([]byte, 8*1024+1*1024)
conn, err := net.DialUDP("udp", nil, addr)

if err != nil {
log.Fatalln(err.Error())
}

return conn
}

func ConnectUDP(client bool, connClient *net.UDPConn, connGossip *net.UDPConn, socketRemote structs.Socket, name string, simple bool) {
for {
buff := make([]byte, 1024)

var size int
var addr *net.UDPAddr
@@ -50,21 +61,23 @@ func ConnectUDP(client bool, connClient *net.UDPConn, connGossip *net.UDPConn, s
}

if err != nil {
log.Fatalf("read error: %s", err.Error())
log.Fatalln(err.Error())
}

socketIn, err := helpers.ParseSocket(addr.String())
socketLocal, err := helpers.ParseSocket(addr.String())

if err != nil {
fmt.Printf("unable to parse socket: %s", err.Error())
fmt.Println(err.Error())

return
}

//if not from client, add peers
if !client {
helpers.AddPeers(socketIn)
helpers.AddPeers(socketLocal)
}

go aggregate.Packet(client, connGossip, size, buff, socketIn, socketOut, name, simple)
go aggregate.Packet(client, connGossip, size, buff, socketLocal, socketRemote, name, simple)
}
}



+ 13
- 11
web/web.go View File

@@ -32,7 +32,7 @@ func (h *HomeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
http.Error(w, http.StatusText(http.StatusUnprocessableEntity), http.StatusUnprocessableEntity)

fmt.Printf("unable to parse form: %s", err)
fmt.Println(err)

return
}
@@ -41,34 +41,34 @@ func (h *HomeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
peerValue := strings.Trim(r.FormValue("peer"), " ")

if messageValue != "" {
AggregateMessage(h, w, r, messageValue, peerValue)
AggregateMessage(h, messageValue)
} else if peerValue != "" {
AggregatePeer(h, w, r, messageValue, peerValue)
AggregatePeer(peerValue)
}
default:
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
}
}

func AggregateMessage(h *HomeHandler, w http.ResponseWriter, r *http.Request, messageValue string, peerValue string) {
func AggregateMessage(h *HomeHandler, messageValue string) {
conn := h.Conn
socketIn := structs.Socket{}
socketOut := helpers.RandomPeer(structs.Socket{})
socket := structs.Socket{}
name := h.Name
simple := false

message := structs.Message{
Text: messageValue,
}

aggregate.PacketClientRumor(conn, socketIn, socketOut, message, name, simple)
aggregate.PacketClientRumor(conn, socket, message, name)
}

func AggregatePeer(h *HomeHandler, w http.ResponseWriter, r *http.Request, messageValue string, peerValue string) {
func AggregatePeer(peerValue string) {
peer, err := helpers.ParseSocket(peerValue)

if err != nil {
fmt.Printf("unable to parse socket: %s", err)
fmt.Println(err)

return
}

helpers.AddPeers(peer)
@@ -88,7 +88,9 @@ func ServeGET(w http.ResponseWriter, r *http.Request, data interface{}) {
output, err := json.Marshal(data)

if err != nil {
log.Fatalf("cannot encode to JSON: %s", err)
fmt.Println(err)

return
}

w.Header().Add("Content-Type", "application/json")


Loading…
Cancel
Save