使用 Airflow XCom 传输二进制数据
Apache Airflow 是一个强大的工作流编排工具,它允许您创建、安排和监控复杂的 ETL (提取、转换、加载) 工作流。XCom 是 Airflow 中一个关键的机制,用于在不同任务之间共享数据。通常情况下,我们使用 XCom 传输文本或 JSON 数据,但您也可以利用它传输二进制数据,例如图像、音频文件或其他二进制内容。
如何在 Airflow XCom 中保存二进制数据?
在 Airflow 中,XCom 存储在数据库中,默认情况下是 PostgreSQL 数据库。这意味着您需要将二进制数据转换为可存储在数据库中的格式。为此,您可以使用以下方法之一:
-
将二进制数据编码为 Base64 字符串: 这是一种常见的将二进制数据转换为文本格式的方法。您可以使用 Python 的
base64
模块进行编码和解码。import base64 def my_task(ti): # 读取二进制数据 with open("my_image.jpg", "rb") as f: image_data = f.read() # 将数据编码为 Base64 字符串 encoded_data = base64.b64encode(image_data).decode("utf-8") # 将编码后的数据存储到 XCom 中 ti.xcom_push(key="image_data", value=encoded_data)
-
将二进制数据序列化为 JSON: 您可以使用 Python 的
json
模块将二进制数据序列化为 JSON 格式。这需要将二进制数据转换为可表示的格式,例如将其转换为字符串。import json def my_task(ti): # 读取二进制数据 with open("my_audio.wav", "rb") as f: audio_data = f.read() # 将数据序列化为 JSON 格式 json_data = {"audio_data": audio_data.hex()} # 将数据存储到 XCom 中 ti.xcom_push(key="audio_data", value=json.dumps(json_data))
-
将二进制数据存储到文件并存储文件名: 您可以将二进制数据存储到临时文件,并将其文件名推送到 XCom 中。接收任务可以通过文件名检索二进制数据。
import os def my_task(ti): # 读取二进制数据 with open("my_file.bin", "rb") as f: data = f.read() # 创建临时文件并存储数据 temp_file = "/tmp/my_temp_file.bin" with open(temp_file, "wb") as f: f.write(data) # 将文件名存储到 XCom 中 ti.xcom_push(key="file_name", value=temp_file)
从 Airflow XCom 中检索二进制数据
在接收任务中,您可以使用 ti.xcom_pull()
方法从 XCom 中检索二进制数据。然后,您可以使用先前使用的编码或序列化方法反转数据并将其还原为原始格式。
例如,使用 Base64 编码检索二进制数据:
import base64
def my_downstream_task(ti):
# 从 XCom 中检索编码后的数据
encoded_data = ti.xcom_pull(task_ids="my_task", key="image_data")
# 将数据解码为二进制数据
decoded_data = base64.b64decode(encoded_data.encode("utf-8"))
# 使用解码后的数据
with open("received_image.jpg", "wb") as f:
f.write(decoded_data)
例如,使用 JSON 序列化检索二进制数据:
import json
def my_downstream_task(ti):
# 从 XCom 中检索序列化后的数据
json_data = ti.xcom_pull(task_ids="my_task", key="audio_data")
# 将数据反序列化为字典
data = json.loads(json_data)
# 提取二进制数据
audio_data = bytes.fromhex(data["audio_data"])
# 使用解码后的数据
with open("received_audio.wav", "wb") as f:
f.write(audio_data)
例如,使用文件路径检索二进制数据:
def my_downstream_task(ti):
# 从 XCom 中检索文件路径
file_name = ti.xcom_pull(task_ids="my_task", key="file_name")
# 读取二进制数据
with open(file_name, "rb") as f:
data = f.read()
# 使用解码后的数据
with open("received_file.bin", "wb") as f:
f.write(data)
总结
使用 Airflow XCom 传输二进制数据需要您将数据转换为可存储在数据库中的格式。您可以选择使用 Base64 编码、JSON 序列化或存储文件路径。选择哪种方法取决于您的具体需求和环境。在检索数据时,请记住使用与存储数据时相同的编码或序列化方法。
注意:
- 存储在 XCom 中的二进制数据的大小有限制,取决于您的 Airflow 安装配置和数据库设置。
- 如果您处理大量二进制数据,建议考虑使用其他方法,例如存储数据到文件系统或对象存储服务,并使用 XCom 传输文件路径。