Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Igp graph processor #81

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Makefile.igp-graph
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
REGISTRY_NAME?=docker.io/iejalapeno
IMAGE_VERSION?=latest

.PHONY: all igp-graph container push clean test

ifdef V
TESTARGS = -v -args -alsologtostderr -v 5
else
TESTARGS =
endif

all: igp-graph

igp-graph:
mkdir -p bin
$(MAKE) -C ./cmd/igp-graph compile-igp-graph

igp-graph-container: igp-graph
docker build -t $(REGISTRY_NAME)/igp-graph:$(IMAGE_VERSION) -f ./build/Dockerfile.igp-graph .

push: igp-graph-container
docker push $(REGISTRY_NAME)/igp-graph:$(IMAGE_VERSION)

clean:
rm -rf bin

test:
GO111MODULE=on go test `go list ./... | grep -v 'vendor'` $(TESTARGS)
GO111MODULE=on go vet `go list ./... | grep -v vendor`
4 changes: 4 additions & 0 deletions build/Dockerfile.igp-graph
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM scratch

COPY ./bin/igp-graph /igp-graph
ENTRYPOINT ["/igp-graph"]
2 changes: 2 additions & 0 deletions cmd/igp-graph/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
compile-igp-graph:
CGO_ENABLED=0 GOOS=linux GO111MODULE=on go build -a -ldflags '-extldflags "-static"' -o ../../bin/igp-graph ./main.go
200 changes: 200 additions & 0 deletions cmd/igp-graph/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (c) 2024 Cisco Systems, Inc. and its affiliates
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// The contents of this file are licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package main

import (
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"

"github.com/cisco-open/jalapeno/igp-graph/arangodb"
"github.com/cisco-open/jalapeno/igp-graph/kafkamessenger"
"github.com/cisco-open/jalapeno/topology/kafkanotifier"
"github.com/golang/glog"

_ "net/http/pprof"
)

const (
// userFile defines the name of file containing base64 encoded user name
userFile = "./credentials/.username"
// passFile defines the name of file containing base64 encoded password
passFile = "./credentials/.password"
// MAXUSERNAME defines maximum length of ArangoDB user name
MAXUSERNAME = 256
// MAXPASS defines maximum length of ArangoDB password
MAXPASS = 256
)

var (
msgSrvAddr string
dbSrvAddr string
dbName string
dbUser string
dbPass string
lsprefix string
lslink string
lssrv6sid string
lsnode string
lsnodeExt string
igpDomain string
lsv4Graph string
lsv6Graph string
)

func init() {
runtime.GOMAXPROCS(1)
flag.StringVar(&msgSrvAddr, "message-server", "", "URL to the messages supplying server")
flag.StringVar(&dbSrvAddr, "database-server", "", "{dns name}:port or X.X.X.X:port of the graph database")
flag.StringVar(&dbName, "database-name", "", "DB name")
flag.StringVar(&dbUser, "database-user", "", "DB User name")
flag.StringVar(&dbPass, "database-pass", "", "DB User's password")

flag.StringVar(&lsprefix, "ls_prefix", "ls_prefix", "ls_prefix Collection name, default: \"ls_prefix\"")
flag.StringVar(&lslink, "ls_link", "ls_link", "ls_link Collection name, default \"ls_link\"")
flag.StringVar(&lssrv6sid, "ls_srv6_sid", "ls_srv6_sid", "ls_srv6_sid Collection name, default: \"ls_srv6_sid\"")
flag.StringVar(&lsnode, "ls_node", "ls_node", "ls_node Collection name, default \"ls_node\"")
flag.StringVar(&lsnodeExt, "ls_node_extended", "ls_node_extended", "ls_node_extended Collection name, default \"ls_node_extended\"")
flag.StringVar(&igpDomain, "igp_domain", "igp_domain", "igp_domain Collection name, default \"igp_domain\"")
flag.StringVar(&lsv4Graph, "lsv4_graph", "lsv4_graph", "lsv4_graph Collection name, default \"lsv4_graph\"")
flag.StringVar(&lsv6Graph, "lsv6_graph", "lsv6_graph", "lsv6_graph Collection name, default \"lsv6_graph\"")
}

var (
onlyOneSignalHandler = make(chan struct{})
shutdownSignals = []os.Signal{os.Interrupt}
)

func setupSignalHandler() (stopCh <-chan struct{}) {
close(onlyOneSignalHandler) // panics when called twice

stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()

return stop
}

func main() {
flag.Parse()
_ = flag.Set("logtostderr", "true")

// validateDBCreds check if the user name and the password are provided either as
// command line parameters or via files. If both are provided command line parameters
// will be used, if neither, processor will fail.
if err := validateDBCreds(); err != nil {
glog.Errorf("failed to validate the database credentials with error: %+v", err)
os.Exit(1)
}

// initialize kafkanotifier to write back processed events into ls_node_edge_events topic
notifier, err := kafkanotifier.NewKafkaNotifier(msgSrvAddr)
if err != nil {
glog.Errorf("failed to initialize events notifier with error: %+v", err)
os.Exit(1)
}

dbSrv, err := arangodb.NewDBSrvClient(dbSrvAddr, dbUser, dbPass, dbName, lsprefix, lslink, lssrv6sid, lsnode,
lsnodeExt, igpDomain, lsv4Graph, lsv6Graph, notifier)
if err != nil {
glog.Errorf("failed to initialize databse client with error: %+v", err)
os.Exit(1)
}

if err := dbSrv.Start(); err != nil {
if err != nil {
glog.Errorf("failed to connect to database with error: %+v", err)
os.Exit(1)
}
}

// Initializing messenger process
msgSrv, err := kafkamessenger.NewKafkaMessenger(msgSrvAddr, dbSrv.GetInterface())
if err != nil {
glog.Errorf("failed to initialize message server with error: %+v", err)
os.Exit(1)
}

msgSrv.Start()

stopCh := setupSignalHandler()
<-stopCh

msgSrv.Stop()
dbSrv.Stop()

os.Exit(0)
}

func validateDBCreds() error {
// Attempting to access username and password files.
u, err := readAndDecode(userFile, MAXUSERNAME)
if err != nil {
if dbUser != "" && dbPass != "" {
return nil
}
return fmt.Errorf("failed to access %s with error: %+v and no username and password provided via command line arguments", userFile, err)
}
p, err := readAndDecode(passFile, MAXPASS)
if err != nil {
if dbUser != "" && dbPass != "" {
return nil
}
return fmt.Errorf("failed to access %s with error: %+v and no username and password provided via command line arguments", passFile, err)
}
dbUser, dbPass = u, p

return nil
}

func readAndDecode(fn string, max int) (string, error) {
f, err := os.Open(fn)
if err != nil {
return "", err
}
defer f.Close()
l, err := f.Stat()
if err != nil {
return "", err
}
b := make([]byte, int(l.Size()))
n, err := io.ReadFull(f, b)
if err != nil {
return "", err
}
if n > max {
return "", fmt.Errorf("length of data %d exceeds maximum acceptable length: %d", n, max)
}
b = b[:n]

return string(b), nil
}
36 changes: 36 additions & 0 deletions deployment/igp-graph.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 1
selector:
matchLabels:
app: igp-graph
template:
metadata:
labels:
app: igp-graph
spec:
containers:
- args:
- --v
- "5"
- --message-server
- "broker.jalapeno:9092"
- --database-server
- "http://arangodb.jalapeno:8529"
- --database-name
- "jalapeno"
image: docker.io/iejalapeno/igp-graph:latest
imagePullPolicy: Always
name: igp-graph
volumeMounts:
- name: credentials
mountPath: /credentials
volumes:
- name: credentials
secret:
secretName: jalapeno
metadata:
name: igp-graph
namespace: jalapeno
100 changes: 100 additions & 0 deletions igp-graph/arangodb/arango-conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2024 Cisco Systems, Inc. and its affiliates
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// The contents of this file are licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package arangodb

import (
"context"
"errors"
"fmt"
"strings"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/http"
"github.com/golang/glog"
)

var (
ErrEmptyConfig = errors.New("ArangoDB Config has an empty field")
ErrUpSafe = errors.New("Failed to UpdateSafe. Requires *DBObjects")
ErrNilObject = errors.New("Failed to operate on NIL object")
ErrNotFound = errors.New("Document not found")
)

type ArangoConfig struct {
URL string `desc:"Arangodb server URL (http://127.0.0.1:8529)"`
User string `desc:"Arangodb server username"`
Password string `desc:"Arangodb server user password"`
Database string `desc:"Arangodb database name"`
}

func NewConfig() ArangoConfig {
return ArangoConfig{}
}

type ArangoConn struct {
db driver.Database
}

var (
ErrCollectionNotFound = fmt.Errorf("Could not find collection")
)

func NewArango(cfg ArangoConfig) (*ArangoConn, error) {
// Connect to DB
if cfg.URL == "" || cfg.User == "" || cfg.Password == "" || cfg.Database == "" {
return nil, ErrEmptyConfig
}
if !strings.Contains(cfg.URL, "http") {
cfg.URL = "http://" + cfg.URL
}
conn, err := http.NewConnection(http.ConnectionConfig{
Endpoints: []string{cfg.URL},
})
if err != nil {
glog.Errorf("Failed to create HTTP connection: %v", err)
return nil, err
}

// Authenticate with DB
conn, err = conn.SetAuthentication(driver.BasicAuthentication(cfg.User, cfg.Password))
if err != nil {
glog.Errorf("Failed to authenticate with arango: %v", err)
return nil, err
}

c, err := driver.NewClient(driver.ClientConfig{
Connection: conn,
})
if err != nil {
glog.Errorf("Failed to create client: %v", err)
return nil, err
}

// If Jalapeno databse does not exist, the topology is not running, goig into a crash loop
db, err := c.Database(context.Background(), cfg.Database)
if err != nil {
return nil, err
}

return &ArangoConn{db: db}, nil
}
Loading
Loading