versions.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #! /usr/bin/env python3
  2. from dotenv import load_dotenv
  3. load_dotenv()
  4. import os
  5. import struct
  6. import zipfile
  7. import oss2
  8. import json
  9. import requests
  10. from requests.exceptions import RequestException
  11. # 切换到项目根目录
  12. os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. def get_chip_id_string(chip_id):
  14. return {
  15. 0x0000: "esp32",
  16. 0x0002: "esp32s2",
  17. 0x0005: "esp32c3",
  18. 0x0009: "esp32s3",
  19. 0x000C: "esp32c2",
  20. 0x000D: "esp32c6",
  21. 0x0010: "esp32h2",
  22. 0x0011: "esp32c5",
  23. 0x0012: "esp32p4",
  24. 0x0017: "esp32c5",
  25. }[chip_id]
  26. def get_flash_size(flash_size):
  27. MB = 1024 * 1024
  28. return {
  29. 0x00: 1 * MB,
  30. 0x01: 2 * MB,
  31. 0x02: 4 * MB,
  32. 0x03: 8 * MB,
  33. 0x04: 16 * MB,
  34. 0x05: 32 * MB,
  35. 0x06: 64 * MB,
  36. 0x07: 128 * MB,
  37. }[flash_size]
  38. def get_app_desc(data):
  39. magic = struct.unpack("<I", data[0x00:0x04])[0]
  40. if magic != 0xabcd5432:
  41. raise Exception("Invalid app desc magic")
  42. version = data[0x10:0x30].decode("utf-8").strip('\0')
  43. project_name = data[0x30:0x50].decode("utf-8").strip('\0')
  44. time = data[0x50:0x60].decode("utf-8").strip('\0')
  45. date = data[0x60:0x70].decode("utf-8").strip('\0')
  46. idf_ver = data[0x70:0x90].decode("utf-8").strip('\0')
  47. elf_sha256 = data[0x90:0xb0].hex()
  48. return {
  49. "name": project_name,
  50. "version": version,
  51. "compile_time": date + "T" + time,
  52. "idf_version": idf_ver,
  53. "elf_sha256": elf_sha256,
  54. }
  55. def get_board_name(folder):
  56. basename = os.path.basename(folder)
  57. if basename.startswith("v0.2"):
  58. return "bread-simple"
  59. if basename.startswith("v0.3") or basename.startswith("v0.4") or basename.startswith("v0.5") or basename.startswith("v0.6"):
  60. if "ML307" in basename:
  61. return "bread-compact-ml307"
  62. elif "WiFi" in basename:
  63. return "bread-compact-wifi"
  64. elif "KevinBox1" in basename:
  65. return "kevin-box-1"
  66. if basename.startswith("v0.7") or basename.startswith("v0.8") or basename.startswith("v0.9") or basename.startswith("v1."):
  67. return basename.split("_")[1]
  68. raise Exception(f"Unknown board name: {basename}")
  69. def read_binary(dir_path):
  70. merged_bin_path = os.path.join(dir_path, "merged-binary.bin")
  71. data = open(merged_bin_path, "rb").read()[0x100000:]
  72. if data[0] != 0xE9:
  73. print(dir_path, "is not a valid image")
  74. return
  75. # get flash size
  76. flash_size = get_flash_size(data[0x3] >> 4)
  77. chip_id = get_chip_id_string(data[0xC])
  78. # get segments
  79. segment_count = data[0x1]
  80. segments = []
  81. offset = 0x18
  82. for i in range(segment_count):
  83. segment_size = struct.unpack("<I", data[offset + 4:offset + 8])[0]
  84. offset += 8
  85. segment_data = data[offset:offset + segment_size]
  86. offset += segment_size
  87. segments.append(segment_data)
  88. assert offset < len(data), "offset is out of bounds"
  89. # extract bin file
  90. bin_path = os.path.join(dir_path, "xiaozhi.bin")
  91. if not os.path.exists(bin_path):
  92. print("extract bin file to", bin_path)
  93. open(bin_path, "wb").write(data)
  94. # The app desc is in the first segment
  95. desc = get_app_desc(segments[0])
  96. return {
  97. "chip_id": chip_id,
  98. "flash_size": flash_size,
  99. "board": get_board_name(dir_path),
  100. "application": desc,
  101. "firmware_size": len(data),
  102. }
  103. def extract_zip(zip_path, extract_path):
  104. if not os.path.exists(extract_path):
  105. os.makedirs(extract_path)
  106. print(f"Extracting {zip_path} to {extract_path}")
  107. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  108. zip_ref.extractall(extract_path)
  109. def upload_dir_to_oss(source_dir, target_dir):
  110. auth = oss2.Auth(os.environ['OSS_ACCESS_KEY_ID'], os.environ['OSS_ACCESS_KEY_SECRET'])
  111. bucket = oss2.Bucket(auth, os.environ['OSS_ENDPOINT'], os.environ['OSS_BUCKET_NAME'])
  112. for filename in os.listdir(source_dir):
  113. oss_key = os.path.join(target_dir, filename)
  114. print('uploading', oss_key)
  115. bucket.put_object(oss_key, open(os.path.join(source_dir, filename), 'rb'))
  116. def post_info_to_server(info):
  117. """
  118. 将固件信息发送到服务器
  119. Args:
  120. info: 包含固件信息的字典
  121. """
  122. try:
  123. # 从环境变量获取服务器URL和token
  124. server_url = os.environ.get('VERSIONS_SERVER_URL')
  125. server_token = os.environ.get('VERSIONS_TOKEN')
  126. if not server_url or not server_token:
  127. raise Exception("Missing SERVER_URL or TOKEN in environment variables")
  128. # 准备请求头和数据
  129. headers = {
  130. 'Authorization': f'Bearer {server_token}',
  131. 'Content-Type': 'application/json'
  132. }
  133. # 发送POST请求
  134. response = requests.post(
  135. server_url,
  136. headers=headers,
  137. json={'jsonData': json.dumps(info)}
  138. )
  139. # 检查响应状态
  140. response.raise_for_status()
  141. print(f"Successfully uploaded version info for tag: {info['tag']}")
  142. except RequestException as e:
  143. if hasattr(e.response, 'json'):
  144. error_msg = e.response.json().get('error', str(e))
  145. else:
  146. error_msg = str(e)
  147. print(f"Failed to upload version info: {error_msg}")
  148. raise
  149. except Exception as e:
  150. print(f"Error uploading version info: {str(e)}")
  151. raise
  152. def main():
  153. release_dir = "releases"
  154. # look for zip files startswith "v"
  155. for name in os.listdir(release_dir):
  156. if name.startswith("v") and name.endswith(".zip"):
  157. tag = name[:-4]
  158. folder = os.path.join(release_dir, tag)
  159. info_path = os.path.join(folder, "info.json")
  160. if not os.path.exists(info_path):
  161. if not os.path.exists(folder):
  162. os.makedirs(folder)
  163. extract_zip(os.path.join(release_dir, name), folder)
  164. info = read_binary(folder)
  165. target_dir = os.path.join("firmwares", tag)
  166. info["tag"] = tag
  167. info["url"] = os.path.join(os.environ['OSS_BUCKET_URL'], target_dir, "xiaozhi.bin")
  168. open(info_path, "w").write(json.dumps(info, indent=4))
  169. # upload all file to oss
  170. upload_dir_to_oss(folder, target_dir)
  171. # read info.json
  172. info = json.load(open(info_path))
  173. # post info.json to server
  174. post_info_to_server(info)
  175. if __name__ == "__main__":
  176. main()