HLS streaming from OBS with NDI plugin
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

202 lignes
5.8KB

  1. #!/usr/bin/env python3
  2. import os
  3. import signal
  4. import subprocess
  5. import threading
  6. from colored import fg, attr
  7. from pathlib import Path
  8. class Logger:
  9. COLORS = [
  10. fg(1), # red
  11. fg(2), # green
  12. fg(3), # yellow
  13. fg(4), # blue
  14. fg(5), # magenta
  15. fg(6), # cyan
  16. ]
  17. COLORS_SIZE = len(COLORS)
  18. NAME_LENGTH = 5
  19. RESET = attr(0)
  20. __index = 0
  21. __lock = threading.Semaphore()
  22. @staticmethod
  23. def get(name):
  24. color = Logger.COLORS[Logger.__index % Logger.COLORS_SIZE]
  25. Logger.__index += 1
  26. return Logger(name, color)
  27. def __init__(self, name, color):
  28. name = name.rjust(Logger.NAME_LENGTH, " ")
  29. self.__prefix = "[%s%s%s] " % (color, name, Logger.RESET)
  30. def log(self, line, end="\n"):
  31. if isinstance(line, bytes):
  32. line = line.decode()
  33. with Logger.__lock:
  34. print(self.__prefix + line, end=end)
  35. def stdout(self, line, end="\n"):
  36. self.log(line, end=end)
  37. def stderr(self, line, end="\n"):
  38. if isinstance(line, bytes):
  39. line = line.decode()
  40. line = "%s%s%s" % (fg(1), line, Logger.RESET)
  41. self.log(line, end=end)
  42. class Cmd:
  43. @staticmethod
  44. def process(io, logger):
  45. for line in io:
  46. logger(line, end="")
  47. def __init__(self, logger, *args, **kwargs):
  48. self.__logger = logger
  49. self.__args = args
  50. self.__kwargs = kwargs
  51. self.__process = None
  52. def run(self):
  53. env = self.__kwargs.pop("env", None)
  54. if env:
  55. env = os.environ.update(env)
  56. cwd = self.__kwargs.pop("cwd", None)
  57. # self.__logger.log(" ".join(self.__args))
  58. self.__process = subprocess.Popen(self.__args,
  59. stdin=subprocess.PIPE,
  60. stdout=subprocess.PIPE,
  61. stderr=subprocess.PIPE, env=env,
  62. cwd=cwd)
  63. threads = [
  64. threading.Thread(target=Cmd.process, args=[self.__process.stdout,
  65. self.__logger.stdout]),
  66. threading.Thread(target=Cmd.process,
  67. args=[self.__process.stderr, self.__logger.stderr])
  68. ]
  69. stdin = self.__kwargs.get("stdin")
  70. if stdin:
  71. self.__process.stdin.write(stdin)
  72. [thread.start() for thread in threads]
  73. retcode = self.__process.wait()
  74. for thread in threads:
  75. thread.join()
  76. self.__process = None
  77. # self.__logger.log(" ".join(self.__args) + ": " + str(retcode))
  78. if retcode:
  79. raise subprocess.CalledProcessError(retcode, self.__args)
  80. def signal(self, signal=signal.SIGTERM):
  81. if self.__process:
  82. try:
  83. self.__process.send_signal(signal)
  84. except:
  85. pass
  86. @staticmethod
  87. def rsync(logger, *args):
  88. cmd = ["rsync", "-r", "--info=name", "--partial", "--inplace", "--delete",
  89. "--chmod=ug=rwX,o=rX",
  90. "-e", "ssh -o ControlMaster=no -o ControlPath=none"] \
  91. + list(args)
  92. return Cmd(logger, *cmd)
  93. class Loop(threading.Thread):
  94. def __init__(self, logger, cmd):
  95. super().__init__()
  96. self.__logger = logger
  97. # if not isinstance(cmd, Cmd):
  98. # cmd = Cmd(self.__logger, cmd)
  99. self.__cmd = cmd
  100. self.__current = None
  101. self.__event = threading.Event()
  102. self.__running = False
  103. def run(self):
  104. self.__running = True
  105. while self.__running:
  106. try:
  107. # self.__logger.stdout("Run")
  108. for cmd in self.__cmd:
  109. self.__current = cmd.run()
  110. self.__current = None
  111. # self.__logger.stdout("Sleep")
  112. self.__event.clear()
  113. self.__event.wait(1)
  114. # self.__logger.stdout("End sleep")
  115. except:
  116. pass
  117. # finally:
  118. # self.__logger.stdout("Runned")
  119. # self.__logger.stdout("Exit")
  120. def stop(self):
  121. if self.__running:
  122. # self.__logger.stdout("Stopping")
  123. self.__running = False
  124. if self.__current:
  125. self.__current.signal()
  126. self.__event.set()
  127. # self.__logger.stdout("Stopping ok")
  128. self.join()
  129. # self.__logger.stdout("Stopped")
  130. class Stream(Loop):
  131. def __init__(self, name, target):
  132. logger = Logger.get(name)
  133. folder = os.path.join("stream", name) + "/"
  134. target_stream = os.path.join(target, name) + "/"
  135. cmd1 = Cmd.rsync(logger, folder, target_stream)
  136. index = os.path.join("stream", name + ".m3u8")
  137. cmd2 = Cmd.rsync(logger, index, target)
  138. super().__init__(logger, [cmd1, cmd2])
  139. class Streams:
  140. NAMES = ["360p", "480p", "720p", "1080p", "audio"]
  141. # NAMES = ["1080p", "720p"]
  142. def __init__(self, target):
  143. self.__streams = []
  144. for name in Streams.NAMES:
  145. folder = os.path.join("stream", name)
  146. if Path(folder).is_dir():
  147. stream = Stream(name, target)
  148. self.__streams.append(stream)
  149. def start(self):
  150. for stream in self.__streams:
  151. stream.start()
  152. def stop(self):
  153. for stream in self.__streams:
  154. stream.stop()
  155. if __name__ == "__main__":
  156. lock = threading.Event()
  157. #streams = Streams("rabbit:/srv/www/fr.passageenseine/upstream/stream1")
  158. #streams = Streams("rabbit:/var/www/stream/stream1")
  159. streams = Streams("bespin:/srv/www/fr.imirhil/stream")
  160. def signal_handler(_1, _2):
  161. streams.stop()
  162. lock.set()
  163. signal.signal(signal.SIGINT, signal_handler)
  164. streams.start()
  165. lock.wait()