package xray import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io/fs" "os" "os/exec" "regexp" "runtime" "strings" "sync" "time" "x-ui/config" "x-ui/util/common" "github.com/Workiva/go-datastructures/queue" statsservice "github.com/xtls/xray-core/app/stats/command" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) var trafficRegex = regexp.MustCompile("(inbound|outbound)>>>([^>]+)>>>traffic>>>(downlink|uplink)") var ClientTrafficRegex = regexp.MustCompile("(user)>>>([^>]+)>>>traffic>>>(downlink|uplink)") func GetBinaryName() string { return fmt.Sprintf("xray-%s-%s", runtime.GOOS, runtime.GOARCH) } func GetBinaryPath() string { return config.GetBinFolderPath() + "/" + GetBinaryName() } func GetConfigPath() string { return config.GetBinFolderPath() + "/config.json" } func GetGeositePath() string { return config.GetBinFolderPath() + "/geosite.dat" } func GetGeoipPath() string { return config.GetBinFolderPath() + "/geoip.dat" } func GetIranPath() string { return config.GetBinFolderPath() + "/iran.dat" } func GetBlockedIPsPath() string { return config.GetBinFolderPath() + "/BlockedIps" } func stopProcess(p *Process) { p.Stop() } type Process struct { *process } func NewProcess(xrayConfig *Config) *Process { p := &Process{newProcess(xrayConfig)} runtime.SetFinalizer(p, stopProcess) return p } type process struct { cmd *exec.Cmd version string apiPort int config *Config lines *queue.Queue exitErr error } func newProcess(config *Config) *process { return &process{ version: "Unknown", config: config, lines: queue.New(100), } } func (p *process) IsRunning() bool { if p.cmd == nil || p.cmd.Process == nil { return false } if p.cmd.ProcessState == nil { return true } return false } func (p *process) GetErr() error { return p.exitErr } func (p *process) GetResult() string { if p.lines.Empty() && p.exitErr != nil { return p.exitErr.Error() } items, _ := p.lines.TakeUntil(func(item interface{}) bool { return true }) lines := make([]string, 0, len(items)) for _, item := range items { lines = append(lines, item.(string)) } return strings.Join(lines, "\n") } func (p *process) GetVersion() string { return p.version } func (p *Process) GetAPIPort() int { return p.apiPort } func (p *Process) GetConfig() *Config { return p.config } func (p *process) refreshAPIPort() { for _, inbound := range p.config.InboundConfigs { if inbound.Tag == "api" { p.apiPort = inbound.Port break } } } func (p *process) refreshVersion() { cmd := exec.Command(GetBinaryPath(), "-version") data, err := cmd.Output() if err != nil { p.version = "Unknown" } else { datas := bytes.Split(data, []byte(" ")) if len(datas) <= 1 { p.version = "Unknown" } else { p.version = string(datas[1]) } } } func (p *process) Start() (err error) { if p.IsRunning() { return errors.New("xray is already running") } defer func() { if err != nil { p.exitErr = err } }() data, err := json.MarshalIndent(p.config, "", " ") if err != nil { return common.NewErrorf("Failed to generate xray configuration file: %v", err) } configPath := GetConfigPath() err = os.WriteFile(configPath, data, fs.ModePerm) if err != nil { return common.NewErrorf("Failed to write configuration file: %v", err) } cmd := exec.Command(GetBinaryPath(), "-c", configPath, "-restrictedIPsPath", GetBlockedIPsPath()) p.cmd = cmd stdReader, err := cmd.StdoutPipe() if err != nil { return err } errReader, err := cmd.StderrPipe() if err != nil { return err } var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() reader := bufio.NewReaderSize(stdReader, 8192) for { line, _, err := reader.ReadLine() if err != nil { return } if p.lines.Len() >= 100 { p.lines.Get(1) } p.lines.Put(string(line)) } }() go func() { defer wg.Done() reader := bufio.NewReaderSize(errReader, 8192) for { line, _, err := reader.ReadLine() if err != nil { return } if p.lines.Len() >= 100 { p.lines.Get(1) } p.lines.Put(string(line)) } }() go func() { err := cmd.Run() if err != nil { p.exitErr = err } wg.Wait() }() p.refreshVersion() p.refreshAPIPort() return nil } func (p *process) Stop() error { if !p.IsRunning() { return errors.New("xray is not running") } return p.cmd.Process.Kill() } func (p *process) GetTraffic(reset bool) ([]*Traffic, []*ClientTraffic, error) { if p.apiPort == 0 { return nil, nil, common.NewError("xray api port wrong:", p.apiPort) } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, nil, err } defer conn.Close() client := statsservice.NewStatsServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() request := &statsservice.QueryStatsRequest{ Reset_: reset, } resp, err := client.QueryStats(ctx, request) if err != nil { return nil, nil, err } tagTrafficMap := map[string]*Traffic{} emailTrafficMap := map[string]*ClientTraffic{} clientTraffics := make([]*ClientTraffic, 0) traffics := make([]*Traffic, 0) for _, stat := range resp.GetStat() { matchs := trafficRegex.FindStringSubmatch(stat.Name) if len(matchs) < 3 { matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name) if len(matchs) < 3 { continue } else { isUser := matchs[1] == "user" email := matchs[2] isDown := matchs[3] == "downlink" if !isUser { continue } traffic, ok := emailTrafficMap[email] if !ok { traffic = &ClientTraffic{ Email: email, } emailTrafficMap[email] = traffic clientTraffics = append(clientTraffics, traffic) } if isDown { traffic.Down = stat.Value } else { traffic.Up = stat.Value } } continue } isInbound := matchs[1] == "inbound" tag := matchs[2] isDown := matchs[3] == "downlink" if tag == "api" { continue } traffic, ok := tagTrafficMap[tag] if !ok { traffic = &Traffic{ IsInbound: isInbound, Tag: tag, } tagTrafficMap[tag] = traffic traffics = append(traffics, traffic) } if isDown { traffic.Down = stat.Value } else { traffic.Up = stat.Value } } return traffics, clientTraffics, nil }