Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates and refactors #75

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
20 changes: 10 additions & 10 deletions chaturbate/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Channel struct {
IsPaused bool
isStopped bool
Logs []string
logType logType
LogType LogType

bufferLock sync.Mutex
buffer map[int][]byte
Expand All @@ -61,32 +61,32 @@ type Channel struct {
// Run
func (w *Channel) Run() {
if w.Username == "" {
w.log(logTypeError, "username is empty, use `-u USERNAME` to specify")
w.log(LogTypeError, "username is empty, use `-u USERNAME` to specify")
return
}

for {
if w.IsPaused {
w.log(logTypeInfo, "channel is paused")
w.log(LogTypeInfo, "channel is paused")
<-w.ResumeChannel // blocking
w.log(logTypeInfo, "channel is resumed")
w.log(LogTypeInfo, "channel is resumed")
}
if w.isStopped {
w.log(logTypeInfo, "channel is stopped")
w.log(LogTypeInfo, "channel is stopped")
break
}

body, err := w.requestChannelBody()
if err != nil {
w.log(logTypeError, "body request error: %w", err)
w.log(LogTypeError, "body request error: %v", err)
}
if strings.Contains(body, "playlist.m3u8") {
w.IsOnline = true
w.LastStreamedAt = time.Now().Format("2006-01-02 15:04:05")
w.log(logTypeInfo, "channel is online, start fetching...")
w.log(LogTypeInfo, "channel is online, start fetching...")

if err := w.record(body); err != nil { // blocking
w.log(logTypeError, "record error: %w", err)
w.log(LogTypeError, "record error: %v", err)
}
continue // this excutes when recording is over/interrupted
}
Expand All @@ -95,11 +95,11 @@ func (w *Channel) Run() {
// close file when offline so user can move/delete it
if w.file != nil {
if err := w.releaseFile(); err != nil {
w.log(logTypeError, "release file: %w", err)
w.log(LogTypeError, "release file: %v", err)
}
}

w.log(logTypeInfo, "channel is offline, check again %d min(s) later", w.Interval)
w.log(LogTypeInfo, "channel is offline, check again %d min(s) later", w.Interval)
<-time.After(time.Duration(w.Interval) * time.Minute) // minutes cooldown to check online status
}
}
Expand Down
60 changes: 36 additions & 24 deletions chaturbate/channel_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"time"
)

// filename
// filename generates the filename based on the session pattern and current split index.
func (w *Channel) filename() (string, error) {
data := w.sessionPattern
if data == nil {
data = map[string]any{
if w.sessionPattern == nil {
w.sessionPattern = map[string]any{
"Username": w.Username,
"Year": time.Now().Format("2006"),
"Month": time.Now().Format("01"),
Expand All @@ -23,69 +22,82 @@ func (w *Channel) filename() (string, error) {
"Second": time.Now().Format("05"),
"Sequence": 0,
}
w.sessionPattern = data
} else {
data["Sequence"] = w.splitIndex
}
t, err := template.New("filename").Parse(w.filenamePattern)

w.sessionPattern["Sequence"] = w.splitIndex

var buf bytes.Buffer
tmpl, err := template.New("filename").Parse(w.filenamePattern)
if err != nil {
return "", err
return "", fmt.Errorf("filename pattern error: %w", err)
}
var buf bytes.Buffer
if err := t.Execute(&buf, data); err != nil {
return "", err
if err := tmpl.Execute(&buf, w.sessionPattern); err != nil {
return "", fmt.Errorf("template execution error: %w", err)
}

return buf.String(), nil
}

// newFile
// newFile creates a new file and prepares it for writing stream data.
func (w *Channel) newFile() error {
filename, err := w.filename()
if err != nil {
return fmt.Errorf("filename pattern error: %w", err)
return err
}

if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil {
return fmt.Errorf("create folder: %w", err)
}

file, err := os.OpenFile(filename+".ts", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777)
if err != nil {
return fmt.Errorf("cannot open file: %s: %w", filename, err)
}
w.log(logTypeInfo, "the stream will be saved as %s.ts", filename)

w.log(LogTypeInfo, "the stream will be saved as %s.ts", filename)
w.file = file
return nil
}

// releaseFile
// releaseFile closes the current file and removes it if empty.
func (w *Channel) releaseFile() error {
if w.file == nil {
return nil
}
// close the file to remove it

if err := w.file.Close(); err != nil {
return fmt.Errorf("close file: %s: %w", w.file.Name(), err)
}
// remove it if it was empty
if w.SegmentFilesize == 0 {
w.log(logTypeInfo, "%s was removed because it was empty", w.file.Name())

if w.SegmentFilesize == 0 {
w.log(LogTypeInfo, "%s was removed because it was empty", w.file.Name())
if err := os.Remove(w.file.Name()); err != nil {
return fmt.Errorf("remove zero file: %s: %w", w.file.Name(), err)
}
}

w.file = nil
return nil
}

// nextFile
func (w *Channel) nextFile() error {
// nextFile handles the transition to a new file segment, ensuring correct timing.
func (w *Channel) nextFile(startTime time.Time) error {
// Release the current file before creating a new one.
if err := w.releaseFile(); err != nil {
w.log(logTypeError, "release file: %w", err)
w.log(LogTypeError, "release file: %v", err)
return err
}

// Increment the split index for the next file.
w.splitIndex++

// Reset segment data.
w.SegmentFilesize = 0
w.SegmentDuration = 0

// Calculate the actual segment duration using the elapsed time.
elapsed := int(time.Since(startTime).Minutes())
w.SegmentDuration = elapsed

// Create the new file.
return w.newFile()
}
68 changes: 44 additions & 24 deletions chaturbate/channel_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ func (w *Channel) resolveSource(body string) (string, string, error) {
if variant == nil {
return "", "", fmt.Errorf("no available resolution")
}
w.log(logTypeInfo, "resolution %dp is used", variant.width)
w.log(LogTypeInfo, "resolution %dp is used", variant.width)

url, ok := variant.framerate[w.Framerate]
// If the framerate is not found, fallback to the first found framerate, this block pretends there're only 30 and 60 fps.
// no complex logic here, im lazy.
if ok {
w.log(logTypeInfo, "framerate %dfps is used", w.Framerate)
w.log(LogTypeInfo, "framerate %dfps is used", w.Framerate)
} else {
for k, v := range variant.framerate {
url = v
w.log(logTypeWarning, "framerate %dfps not found, fallback to %dfps", w.Framerate, k)
w.log(LogTypeWarning, "framerate %dfps not found, fallback to %dfps", w.Framerate, k)
w.Framerate = k
break
}
Expand All @@ -196,66 +196,86 @@ func (w *Channel) resolveSource(body string) (string, string, error) {
return rootURL, sourceURL, nil
}

// mergeSegments is a async function that runs in background for the channel,
// and it merges the segments from buffer to the file.
// mergeSegments runs in the background and merges segments from the buffer to the file.
func (w *Channel) mergeSegments() {
var segmentRetries int
startTime := time.Now() // Track the start time of the current segment.

for {
if w.IsPaused || w.isStopped {
break
}

// Handle segment retries if not found.
if segmentRetries > 5 {
w.log(logTypeWarning, "segment #%d not found in buffer, skipped", w.bufferIndex)
w.log(LogTypeWarning, "segment #%d not found in buffer, skipped", w.bufferIndex)
w.bufferIndex++
segmentRetries = 0
continue
}

// If buffer is empty, wait and retry.
if len(w.buffer) == 0 {
<-time.After(1 * time.Second)
time.Sleep(1 * time.Second)
continue
}

// Retrieve segment from buffer.
w.bufferLock.Lock()
buf, ok := w.buffer[w.bufferIndex]
w.bufferLock.Unlock()

if !ok {
segmentRetries++
<-time.After(time.Duration(segmentRetries) * time.Second)
time.Sleep(time.Duration(segmentRetries) * time.Second)
continue
}

// Write the segment to the file.
lens, err := w.file.Write(buf)
if err != nil {
w.log(logTypeError, "segment #%d written error: %v", w.bufferIndex, err)
w.log(LogTypeError, "segment #%d written error: %v", w.bufferIndex, err)
w.retries++
continue
}
w.log(logTypeInfo, "segment #%d written", w.bufferIndex)
w.log(logTypeDebug, "duration: %s, size: %s", DurationStr(w.SegmentDuration), ByteStr(w.SegmentFilesize))

// Update segment size and log progress.
w.SegmentFilesize += lens
segmentRetries = 0
w.log(LogTypeInfo, "segment #%d written", w.bufferIndex)
w.log(LogTypeDebug, "duration: %s, size: %s", DurationStr(w.SegmentDuration), ByteStr(w.SegmentFilesize))

// Check if the file size limit has been reached.
if w.SplitFilesize > 0 && w.SegmentFilesize >= w.SplitFilesize*1024*1024 {
w.log(logTypeInfo, "filesize exceeded, creating new file")
w.log(LogTypeInfo, "filesize exceeded, creating new file")

if err := w.nextFile(); err != nil {
w.log(logTypeError, "next file error: %v", err)
if err := w.nextFile(startTime); err != nil {
w.log(LogTypeError, "next file error: %v", err)
break
}
} else if w.SplitDuration > 0 && w.SegmentDuration >= w.SplitDuration*60 {
w.log(logTypeInfo, "duration exceeded, creating new file")

if err := w.nextFile(); err != nil {
w.log(logTypeError, "next file error: %v", err)
startTime = time.Now() // Reset start time for the new segment.
}

// Check if the duration limit has been reached.
elapsed := int(time.Since(startTime).Minutes())
if w.SplitDuration > 0 && elapsed >= w.SplitDuration {
w.log(LogTypeInfo, "duration exceeded, creating new file")

if err := w.nextFile(startTime); err != nil {
w.log(LogTypeError, "next file error: %v", err)
break
}

startTime = time.Now() // Reset start time for the new segment.
}

// Remove the processed segment from the buffer.
w.bufferLock.Lock()
delete(w.buffer, w.bufferIndex)
w.bufferLock.Unlock()

w.bufferIndex++
w.bufferIndex++ // Move to the next segment.
segmentRetries = 0 // Reset retries for the next segment.
}
}

Expand All @@ -276,15 +296,15 @@ func (w *Channel) fetchSegments() {
break
}

w.log(logTypeError, "segment list error, will try again [%d/10]: %v", disconnectRetries, err)
w.log(LogTypeError, "segment list error, will try again [%d/10]: %v", disconnectRetries, err)
disconnectRetries++

<-time.After(time.Duration(wait) * time.Second)
continue
}

if disconnectRetries > 0 {
w.log(logTypeInfo, "channel is back online!")
w.log(LogTypeInfo, "channel is back online!")
w.IsOnline = true
disconnectRetries = 0
}
Expand All @@ -296,7 +316,7 @@ func (w *Channel) fetchSegments() {

go func(index int, uri string) {
if err := w.requestSegment(uri, index); err != nil {
w.log(logTypeError, "segment #%d request error, ignored: %v", index, err)
w.log(LogTypeError, "segment #%d request error, ignored: %v", index, err)
return
}
}(w.segmentIndex, v.URI)
Expand Down Expand Up @@ -379,7 +399,7 @@ func (w *Channel) requestSegment(url string, index int) error {
return fmt.Errorf("read body: %w", err)
}

w.log(logTypeDebug, "segment #%d fetched", index)
w.log(LogTypeDebug, "segment #%d fetched", index)

w.bufferLock.Lock()
w.buffer[index] = body
Expand Down
33 changes: 14 additions & 19 deletions chaturbate/channel_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,28 @@ import (
"time"
)

type logType string

const (
logTypeDebug logType = "DEBUG"
logTypeInfo logType = "INFO"
logTypeWarning logType = "WARN"
logTypeError logType = "ERROR"
)

// log
func (w *Channel) log(typ logType, message string, v ...interface{}) {
switch w.logType {
case logTypeInfo:
if typ == logTypeDebug {
func (w *Channel) log(typ LogType, message string, v ...interface{}) {
// Check the global log level
currentLogLevel := GetGlobalLogLevel()

switch currentLogLevel {
case LogTypeInfo:
if typ == LogTypeDebug {
return
}
case logTypeWarning:
if typ == logTypeDebug || typ == logTypeInfo {
case LogTypeWarning:
if typ == LogTypeDebug || typ == LogTypeInfo {
return
}
case logTypeError:
if typ == logTypeDebug || typ == logTypeInfo || typ == logTypeWarning {
case LogTypeError:
if typ == LogTypeDebug || typ == LogTypeInfo || typ == LogTypeWarning {
return
}
}

updateLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, fmt.Errorf(message, v...))
consoleLog := fmt.Sprintf("[%s] [%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, w.Username, fmt.Errorf(message, v...))
updateLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, fmt.Sprintf(message, v...))
consoleLog := fmt.Sprintf("[%s] [%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, w.Username, fmt.Sprintf(message, v...))

update := &Update{
Username: w.Username,
Expand All @@ -43,6 +37,7 @@ func (w *Channel) log(typ logType, message string, v ...interface{}) {
SegmentDuration: w.SegmentDuration,
SegmentFilesize: w.SegmentFilesize,
}

if w.file != nil {
update.Filename = w.file.Name()
}
Expand Down
Loading