HLS streaming from OBS with NDI plugin
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

202 Zeilen
5.8KB

  1. #!/usr/bin/env python3
  2. import os
  3. import signal
  4. import subprocess
  5. import threading
  6. from colored import fg, attr
  7. from pathlib import Path
  8. class Logger:
  9. COLORS = [
  10. fg(1), # red
  11. fg(2), # green
  12. fg(3), # yellow
  13. fg(4), # blue
  14. fg(5), # magenta
  15. fg(6), # cyan
  16. ]
  17. COLORS_SIZE = len(COLORS)
  18. NAME_LENGTH = 5
  19. RESET = attr(0)
  20. __index = 0
  21. __lock = threading.Semaphore()
  22. @staticmethod
  23. def get(name):
  24. color = Logger.COLORS[Logger.__index % Logger.COLORS_SIZE]
  25. Logger.__index += 1
  26. return Logger(name, color)
  27. def __init__(self, name, color):
  28. name = name.rjust(Logger.NAME_LENGTH, " ")
  29. self.__prefix = "[%s%s%s] " % (color, name, Logger.RESET)
  30. def log(self, line, end="\n"):
  31. if isinstance(line, bytes):
  32. line = line.decode()
  33. with Logger.__lock:
  34. print(self.__prefix + line, end=end)
  35. def stdout(self, line, end="\n"):
  36. self.log(line, end=end)
  37. def stderr(self, line, end="\n"):
  38. if isinstance(line, bytes):
  39. line = line.decode()
  40. line = "%s%s%s" % (fg(1), line, Logger.RESET)
  41. self.log(line, end=end)
  42. class Cmd:
  43. @staticmethod
  44. def process(io, logger):
  45. for line in io:
  46. logger(line, end="")
  47. def __init__(self, logger, *args, **kwargs):
  48. self.__logger = logger
  49. self.__args = args
  50. self.__kwargs = kwargs
  51. self.__process = None
  52. def run(self):
  53. env = self.__kwargs.pop("env", None)
  54. if env:
  55. env = os.environ.update(env)
  56. cwd = self.__kwargs.pop("cwd", None)
  57. # self.__logger.log(" ".join(self.__args))
  58. self.__process = subprocess.Popen(self.__args,
  59. stdin=subprocess.PIPE,
  60. stdout=subprocess.PIPE,
  61. stderr=subprocess.PIPE, env=env,
  62. cwd=cwd)
  63. threads = [
  64. threading.Thread(target=Cmd.process, args=[self.__process.stdout,
  65. self.__logger.stdout]),
  66. threading.Thread(target=Cmd.process,
  67. args=[self.__process.stderr, self.__logger.stderr])
  68. ]
  69. stdin = self.__kwargs.get("stdin")
  70. if stdin:
  71. self.__process.stdin.write(stdin)
  72. [thread.start() for thread in threads]
  73. retcode = self.__process.wait()
  74. for thread in threads:
  75. thread.join()
  76. self.__process = None
  77. # self.__logger.log(" ".join(self.__args) + ": " + str(retcode))
  78. if retcode:
  79. raise subprocess.CalledProcessError(retcode, self.__args)
  80. def signal(self, signal=signal.SIGTERM):
  81. if self.__process:
  82. try:
  83. self.__process.send_signal(signal)
  84. except:
  85. pass
  86. @staticmethod
  87. def rsync(logger, *args):
  88. cmd = ["rsync", "-r", "--info=name", "--partial", "--inplace", "--delete",
  89. "--chmod=ug=rwX,o=rX",
  90. "-e", "ssh -o ControlMaster=no -o ControlPath=none"] \
  91. + list(args)
  92. return Cmd(logger, *cmd)
  93. class Loop(threading.Thread):
  94. def __init__(self, logger, cmd):
  95. super().__init__()
  96. self.__logger = logger
  97. # if not isinstance(cmd, Cmd):
  98. # cmd = Cmd(self.__logger, cmd)
  99. self.__cmd = cmd
  100. self.__current = None
  101. self.__event = threading.Event()
  102. self.__running = False
  103. def run(self):
  104. self.__running = True
  105. while self.__running:
  106. try:
  107. # self.__logger.stdout("Run")
  108. for cmd in self.__cmd:
  109. self.__current = cmd.run()
  110. self.__current = None
  111. # self.__logger.stdout("Sleep")
  112. self.__event.clear()
  113. self.__event.wait(1)
  114. # self.__logger.stdout("End sleep")
  115. except:
  116. pass
  117. # finally:
  118. # self.__logger.stdout("Runned")
  119. # self.__logger.stdout("Exit")
  120. def stop(self):
  121. if self.__running:
  122. # self.__logger.stdout("Stopping")
  123. self.__running = False
  124. if self.__current:
  125. self.__current.signal()
  126. self.__event.set()
  127. # self.__logger.stdout("Stopping ok")
  128. self.join()
  129. # self.__logger.stdout("Stopped")
  130. class Stream(Loop):
  131. def __init__(self, name, target):
  132. logger = Logger.get(name)
  133. folder = os.path.join("stream", name) + "/"
  134. target_stream = os.path.join(target, name) + "/"
  135. cmd1 = Cmd.rsync(logger, folder, target_stream)
  136. index = os.path.join("stream", name + ".m3u8")
  137. cmd2 = Cmd.rsync(logger, index, target)
  138. super().__init__(logger, [cmd1, cmd2])
  139. class Streams:
  140. NAMES = ["360p", "480p", "720p", "1080p", "audio"]
  141. # NAMES = ["1080p", "720p"]
  142. def __init__(self, target):
  143. self.__streams = []
  144. for name in Streams.NAMES:
  145. folder = os.path.join("stream", name)
  146. if Path(folder).is_dir():
  147. stream = Stream(name, target)
  148. self.__streams.append(stream)
  149. def start(self):
  150. for stream in self.__streams:
  151. stream.start()
  152. def stop(self):
  153. for stream in self.__streams:
  154. stream.stop()
  155. if __name__ == "__main__":
  156. lock = threading.Event()
  157. #streams = Streams("rabbit:/srv/www/fr.passageenseine/upstream/stream1")
  158. #streams = Streams("rabbit:/var/www/stream/stream1")
  159. streams = Streams("bespin:/srv/www/fr.imirhil/stream")
  160. def signal_handler(_1, _2):
  161. streams.stop()
  162. lock.set()
  163. signal.signal(signal.SIGINT, signal_handler)
  164. streams.start()
  165. lock.wait()