Netty实现大文件分块传输详解

文章目录

  • 前言
  • Netty如何处理切分大文件的异步传输


前言

在Netty中,切分大文件进行传输主要利用ChunkedWriteHandler以及它的实现类,如ChunkedFile。这种机制允许你将大文件切分成多个小块(chunks),并通过Netty的pipeline进行异步发送。以下是使用Netty切分大文件进行传输的基本步骤:

  1. 设置Netty的ServerBootstrap或Bootstrap
    首先,你需要配置你的ServerBootstrap或Bootstrap,并添加ChunkedWriteHandler到你的pipeline中。ChunkedWriteHandler会处理所有ChunkedInputChunkedStream的写入。

  2. 创建ChunkedFile对象
    使用ChunkedFile类来创建一个代表大文件的ChunkedInput对象。你需要提供文件的路径,以及每个chunk的大小(或者默认使用ChunkedFile的默认chunk大小)。

  3. 写入Channel
    ChunkedFile对象写入Channel。由于ChunkedFile实现了ChunkedInput接口,Netty会自动处理文件的切分和发送。

下面是一个简单的示例代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;

public class FileServer {

    private final int port;

    public FileServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     // 添加HTTP编解码器
                     p.addLast(new HttpServerCodec());
                     // 添加ChunkedWriteHandler,它负责处理ChunkedInput的写入
                     p.addLast(new ChunkedWriteHandler());
                     // 添加自定义的处理器
                     p.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
                         @Override
                         protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
                             // 获取请求的路径
                             String uri = request.uri();
                             // 假设文件都在服务器的某个目录下
                             File file = new File("path/to/your/files" + uri);

                             if (file.exists()) {
                                 // 创建ChunkedFile对象
                                 ChunkedFile chunkedFile = new ChunkedFile(file);

                                 // 设置响应头信息
                                 HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                                 HttpHeaders headers = response.headers();
                                 headers.set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
                                 headers.set(HttpHeaderNames.CONTENT_LENGTH, file.length());

                                 // 写入响应
                                 ctx.write(response);
                                 // 写入文件内容
                                 ctx.writeAndFlush(chunkedFile);
                             } else {
                                 // 文件不存在时的处理
                                 HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
                                 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                             }
                         }
                     });
                 }
             });

            // 绑定端口并开始接收传入的连接
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new FileServer(port).run();
    }
}

Netty如何处理切分大文件的异步传输

在Netty中处理切分大文件的异步传输通常涉及以下几个步骤:

  1. 读取和切分文件
    Netty提供了ChunkedFile类,它实现了ChunkedInput接口,用于异步地读取和发送文件内容。你可以创建一个ChunkedFile对象,并指定要传输的文件路径和每个chunk的大小。

  2. 写入Chunk到Channel
    ChunkedFile对象写入到Channel中。由于ChunkedFile实现了ChunkedInput接口,Netty会异步地处理文件的读取和发送。你可以使用ctx.writeAndFlush(chunkedFile)来将文件写入到Channel,并异步地发送数据。

  3. ChunkedWriteHandler 处理分块写操作
    ctx.writeAndFlush(chunkedFile)->AbstractChannelHandlerContext->writeAndFlush(Object msg)->write->invokeWriteAndFlush

   private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
       if (invokeHandler()) {
           invokeWrite0(msg, promise);
           invokeFlush0();
       } else {
           writeAndFlush(msg, promise);
       }
   }

invokeWrite0->ChunkedWriteHandler.write()

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        queue.add(new PendingWrite(msg, promise));
    }

invokeFlush0()->ChunkedWriteHandler.flush()->doFlush


    private void doFlush(final ChannelHandlerContext ctx) {
        final Channel channel = ctx.channel();
        if (!channel.isActive()) {
            discard(null);
            return;
        }

        boolean requiresFlush = true;
        ByteBufAllocator allocator = ctx.alloc();
        while (channel.isWritable()) {
            if (currentWrite == null) {
                currentWrite = queue.poll();
            }

            if (currentWrite == null) {
                break;
            }
            final PendingWrite currentWrite = this.currentWrite;
            final Object pendingMessage = currentWrite.msg;
            if (pendingMessage instanceof ChunkedInput) {
                final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
                boolean endOfInput;
                boolean suspend;
                Object message = null;
                try {
                    message = chunks.readChunk(allocator);
                    endOfInput = chunks.isEndOfInput();

                    if (message == null) {
                        // No need to suspend when reached at the end.
                        suspend = !endOfInput;
                    } else {
                        suspend = false;
                    }
                } catch (final Throwable t) {
                    this.currentWrite = null;
                    if (message != null) {
                        ReferenceCountUtil.release(message);
                    }
                    closeInput(chunks);
                    currentWrite.fail(t);
                    break;
                }

                if (suspend) {
                    // ChunkedInput.nextChunk() returned null and it has
                    // not reached at the end of input. Let's wait until
                    // more chunks arrive. Nothing to write or notify.
                    break;
                }

                if (message == null) {
                    // If message is null write an empty ByteBuf.
                    // See https://github.com/netty/netty/issues/1671
                    message = Unpooled.EMPTY_BUFFER;
                }

                ChannelFuture f = ctx.write(message);
                if (endOfInput) {
                    this.currentWrite = null;
                    // Register a listener which will close the input once the write is complete.
                    // This is needed because the Chunk may have some resource bound that can not
                    // be closed before its not written.
                    // See https://github.com/netty/netty/issues/303
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                closeInput(chunks);
                                currentWrite.fail(future.cause());
                            } else {
                                // read state of the input in local variables before closing it
                                long inputProgress = chunks.progress();
                                long inputLength = chunks.length();
                                closeInput(chunks);
                                currentWrite.progress(inputProgress, inputLength);
                                currentWrite.success(inputLength);
                            }
                        }
                    });
                } else if (channel.isWritable()) {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                closeInput(chunks);
                                currentWrite.fail(future.cause());
                            } else {
                                currentWrite.progress(chunks.progress(), chunks.length());
                            }
                        }
                    });
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                closeInput(chunks);
                                currentWrite.fail(future.cause());
                            } else {
                                currentWrite.progress(chunks.progress(), chunks.length());
                                if (channel.isWritable()) {
                                    resumeTransfer();
                                }
                            }
                        }
                    });
                }
                // Flush each chunk to conserve memory
                ctx.flush();
                requiresFlush = false;
            }
      }
    }
  1. 处理写入完成事件
    你可以通过监听ChannelFuture的完成事件来确定文件是否已经完全发送。当writeAndFlush方法返回的ChannelFuture完成时,表示数据已经被写入到底层的传输层,并且可以被远程客户端接收。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/548631.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ 为什么不能在构造函数中调用虚函数

最近在Clion编辑器中看到构造函数中调用虚函数提示&#xff1a; Do not invoke virtual member functions from constructor 这里记录一下为什么不能在构造函数中调用虚函数。 #include <iostream> #include <string>using namespace std;class BaseClass {publi…

大模型时代:普通人该如何获利?

随着科技的飞速发展&#xff0c;我们正处在一个大模型的时代。所谓大模型&#xff0c;就是指那些拥有数十亿、甚至千亿参数的深度学习模型。这些大模型的出现&#xff0c;不仅推动了人工智能技术的进步&#xff0c;也为普通人创造了众多的获利机会。那么&#xff0c;在这个大模…

【Java开发指南 | 第六篇】Java成员变量(实例变量)、 类变量(静态变量)

读者可订阅专栏&#xff1a;Java开发指南 |【CSDN秋说】 文章目录 成员变量&#xff08;实例变量&#xff09;类变量&#xff08;静态变量&#xff09;定义方式静态变量的使用场景 成员变量&#xff08;实例变量&#xff09; 成员变量声明在一个类中&#xff0c;但在方法、构造…

GAMS104 现代游戏引擎 2

渲染的难点可以分为一下三部分&#xff1a;如何计算入射光线、如何考虑材质以及如何实现全局光照。 渲染的难点之一在于阴影&#xff0c;或者说是光的可见性。如何做出合适的阴影效果远比想象中要难得多&#xff0c;在实践中往往需要通过大量的技巧才能实现符合人认知的阴影效…

AI数字人对话之RealChar框架源码解读

零.功能介绍 与虚拟角色(非形象)进行文本或语音会话 体验地址:RealChar. 代码库:GitHub - Shaunwei/RealChar: 🎙️🤖Create, Customize and Talk to your AI Character/Companion in Realtime (All in One Codebase!). Have a natural seamless conversation with AI…

3.3 Ax=b 的完全解

一、Ax b 在求解 A x 0 A\boldsymbol x\boldsymbol 0 Ax0 时&#xff0c;我们将其转化成 R x 0 R\boldsymbol x\boldsymbol 0 Rx0&#xff0c;将自由变量赋予特殊值&#xff08;1 或 0&#xff09;&#xff0c;主元变量即可通过回代求出。这个过程中我们没有关注右侧的 …

基于SpringBoot+Vue的在线教育系统(源码+文档+包运行)

一.系统概述 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了在线教育系统的开发全过程。通过分析在线教育系统管理的不足&#xff0c;创建了一个计算机管理在线教育系统的方案。文章介绍了在线教育系统的系统分析部…

Python基于Django的微博热搜、微博舆论可视化系统

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

【SLAM】在Win10上实现Nerf-Pytorch【GPU版】

文章目录 ReadMe安装依赖运行下载两个示例数据集:lego和fern训练一个低分辨率的Lego NeRF:训练一个低分辨率蕨类植物NeRF:更多数据集预训练模型可复现实现1、下载nerf-pytorch工程2、安装依赖3、下载数据4、运行lego NeRF:ReadMe Github链接 NeRF (神经辐射场)是一种在合成…

UE5 C++ 创建3DWidgete 血条 再造成伤害

一&#xff0e;创建 二&#xff0e;&#xff35;&#xff29;里声明变量 创建类 public:UPROPERTY(EditAnywhere,BlueprintReadWrite,Category "MyWidget")float CurrentHealth 100.0f;UPROPERTY(EditAnywhere,BlueprintReadWrite,Category "MyWidget"…

代码随想录算法训练营DAY24|C++回溯算法Part.1|回溯算法理论基础、77.组合、组合问题的剪枝操作

文章目录 回溯算法如何理解回溯算法回溯法模版回溯算法模版框架 77.组合树形结构回溯三部曲伪代码CPP代码实现 组合问题的剪枝操作 回溯算法 如何理解回溯算法 回溯法解决的问题都可以抽象为树形结构。 因为回溯法解决的都是在集合中递归查找子集&#xff0c;集合的大小就构成…

mysql面试题 二

超键、候选键、主键、外键分别是什么&#xff1f; 超键&#xff1a;在关系中能唯一标识元组的属性集称为关系模式的超键。一个属性可以为作为一个超键&#xff0c;多个属性组合在一起也可以作为一个超键。超键包含候选键和主键。候选键&#xff1a;是最小超键&#xff0c;即没…

【Altium Designer 20 笔记】PCB板框

Altium Designer中设置PCB板框 PCB板框位于Mechanical1层 点击放置中的线条或使用其他绘图工具来绘制板框, 可以绘制矩形、圆形或其他形状的板框,确保板框是闭合的 注意&#xff1a;在绘制板框时&#xff0c;确保线条的起点和终点相连&#xff0c;形成一个闭合的图形。 快捷键D…

【C++航海王:追寻罗杰的编程之路】异常——错误处理方式之一

目录 引言 1 -> C语言传统的处理错误的方式 2 -> C异常概念 3 -> 异常的使用 3.1 -> 异常的抛出和捕获 3.2 -> 异常的重新抛出 3.3 -> 异常规范 4 -> 自定义异常体系 5 -> C标准库的异常体系 6 -> 异常的优缺点 引言 在C编程中&#xff…

C++ | Leetcode C++题解之第32题最长有效括号

题目&#xff1a; 题解&#xff1a; class Solution { public:int longestValidParentheses(string s) {int left 0, right 0, maxlength 0;for (int i 0; i < s.length(); i) {if (s[i] () {left;} else {right;}if (left right) {maxlength max(maxlength, 2 * ri…

单细胞分析|映射和注释查询数据集

reference映射简介 在本文中&#xff0c;我们首先构建一个reference&#xff0c;然后演示如何利用该reference来注释新的查询数据集。生成后&#xff0c;该reference可用于通过cell类型标签传输和将查询cell投影到reference UMAP 等任务来分析其他查询数据集。值得注意的是&am…

做一个好的程序员难吗?只需要这10个习惯

在这个世界上&#xff0c;有数以百万计的人对软件开发充满热情&#xff0c;他们有很多名字&#xff0c;如软件工程师、程序员、编码员、开发人员。一段时间后&#xff0c;这些人可能会成为一名优秀的编码员&#xff0c;并且他们将非常熟悉如何使用计算机语言完成工作。但是&…

【LeetCode】 2724. 排序方式

排序方式 给定一个数组 arr 和一个函数 fn&#xff0c;返回一个排序后的数组 sortedArr。你可以假设 fn 只返回数字&#xff0c;并且这些数字决定了 sortedArr 的排序顺序。sortedArr 必须按照 fn 的输出值 升序 排序。 你可以假设对于给定的数组&#xff0c;fn 不会返回重复的…

记录Python链接mysql的数据库的2种操作方式

一、使用pymysql库方式 import pymysqldb pymysql.connect(hostlocalhost,userroot,password123456) #创建链接&#xff0c;在3.8以后好像已经不支持这个种链接方式了&#xff0c; #db pymysql.connect(localhost,root,123456) cursor db.cursor()#拿到游标这样我们就拿到了…

DataX-Web,介绍-安装-部署-启动

使用文档&#xff1a;GitHub - WeiYe-Jing/datax-web: DataX集成可视化页面 目录 1、DataX-Web介绍 2、DataX-Web部署 3、DataX-Web启动命令 1、DataX-Web介绍 GitHub - WeiYe-Jing/datax-web&#xff1a;DataX集成可视化页面&#xff0c;选择数据源即可一键生成数据同步任务…
最新文章