電動車增長 高盛估全球石油需求最快2024年觸頂

路透社7月25日報導,高盛(Goldman Sachs)最新報告表示,受到汽車燃油效率提高、電動車產業快速發展、經濟增長低迷以及高油價等因素的影響,全球石油需求最快可能在2024年就會到達高峰。報告預估,全球的電動車市場將從2016年的200萬輛,爆發增長至2030年的8,300萬輛。全球石油需求的年均增長率將從2011到2016年期間的1.6%,降至2017到2022年的1.2%,至2025年降至0.7%,2030年降至0.4%。

高盛表示,從現在起到2030年,運輸部門對石油需求增長的貢獻將會逐步下滑,石化產業的需求將取而代之並躍居主流。報告也預估未來五年油品的供應將會出現過剩,因煉油產能持續增加但需求增長放緩的影響,這會使得全球煉油廠的產能利用率下滑,並壓縮煉油廠的毛利。此外,由於越來越多的石化原料來自煉油體系之外(如天然氣凝析油等),煉油廠石油需求的佔比也將會下滑。

亞洲三大石油消費國中國大陸、印度以及日本的需求增長疲弱,將令油市重新恢復平衡的時間拉長。大陸、印度以及日本合計佔全球石油需求的20%比重,但各自面臨不同的困難,使得石油需求的增長疲弱。其中,日本受困於人口老化以及汽車燃油效率的持續提高,印度因去年底去貨幣化的政策衝擊需求,而中國大陸正積極去化過剩煉油產能,將會影響到原油的需求。

英國石油公司(BP PLC)發布的《世界能源統計年鑑》表示,2016年,全球能源需求增長1%,連續第三年呈現疲弱的增長態勢(2014與2015年分別年增1%與0.9%),相比過去十年的平均增長率為1.8%。主要的增長來自於中國大陸與印度,其中印度2016年能源需求年增5.4%,增速與過去幾年相符。大陸去年能源需求年增1.3%,與2015年的1.2%增幅相近,但只有過去十年平均增速的四分之一,並寫下1997-98年亞洲金融風暴以來的連續兩年最低增速。

BP年鑑指出,2016年,石油消費佔全球能源消費的三分之一比重,全球石油需求年增1.6%或每日160萬桶,此高於過去十年的平均增速(1.2%)。其中,大陸石油需求年增每日40萬桶,印度以及歐洲的石油需求均年增每日30萬桶。2016年,全球石油日產量僅年增40萬桶,則是創下2013年以來的最低增長;其中,中東的石油日產量年增170萬桶,但中東以外的石油日產量則是年減130萬桶。

(本文內容由授權使用。圖片出處:public domain CC0)

 

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

Gogoro 太陽能電池交換站,在新北八里驅動示範

新創科技公司 Gogoro 睿能創意股份有限公司,28 日宣布其與新北市政府合作的首座太陽能電池交換站,正式在新北市八里區開始營運。隨著這個搭配太陽能面板的電池交換站的啟動,Gogoro 對永續能源的承諾邁出更關鍵的一步。   這座位於八里十三行博物館附近(新北市八里區文昌路與文昌一街交叉口)的 Gogoro 八里公兒四電池交換站配置 2.3kW 太陽能面板,依據天氣環境等因素,每天可以產生大約 6.21 kWH 的電力,是第一座以再生能源提供電力的 GoStation 電池交換站。太陽能電力不但可以提供電池交換站的 Gogoro 電池使用,降低發電時所產生的二氧化碳排放,更可以與全台電力網路連結,參與整體網路的電量調節。   Gogoro 執行長陸學森表示 :「Gogoro 致力發展潔淨的智慧能源,希望透過具備能源調度能力的智慧電網,成為城市的電力調節樞紐,以促成電力平衡。與新北市政府合作的太陽能八里電池交換示範站為我們上述的目標邁出了重要的一步。」   新北市政府則表示,市府一直在積極尋找降低溫室氣體排放、改善空氣污染的積極方法,與 Gogoro 合作成立全世界第一座太陽能電池交換站具有標竿式的意義。電動機車零排放的特性不但可以提升市區的空氣品質,以再生能源為來源的電力,更可以降低交通工具所帶來的碳排放。未來也將持續努力發展以再生能源補充電力的相關設施,做為新北市重要的基礎設施。   這座太陽能電池交換示範站,設有物聯網智慧平台,透過分析供電情況的螢幕,說明了包括減少碳排量,減少樹木砍伐面積,綠能總儲電量,城市電網 ,太陽能發電量等訊息,讓每名換電的民眾,清楚的知道,自己對環境的貢獻度,也具有相當程度的教育意義。   Gogoro 目前擁有 25,000 名車主,總共累計將近 500 萬次的電池交換,9,000 萬公里的總里程數,已經替地球減少 720 萬公斤的二氧化碳排放,隨著未來再生能源比例逐漸提升,Gogoro 的車主們將更對地球與環境產生更多的正面影響力。Gogoro 為鼓勵車主響應綠能電池交換站,也設計了特別的機制。凡是造訪八里公兒四站,並完成電池交換的車主,即可在 Gogoro App 上獲得「阿波羅的力量」徽章。   (合作媒體:。圖片出處:科技新報)  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

動手造輪子:實現簡單的 EventQueue

動手造輪子:實現簡單的 EventQueue

Intro

最近項目里有遇到一些併發的問題,想實現一個隊列來將併發的請求一個一個串行處理,可以理解為使用消息隊列處理併發問題,之前實現過一個簡單的 EventBus,於是想在 EventBus 的基礎上改造一下,加一個隊列,改造成類似消息隊列的處理模式。消息的處理(Consumer)直接使用 .netcore 里的 IHostedService 來實現了一個簡單的後台任務處理。

初步設計

  • Event 抽象的事件
  • EventHandler 處理 Event 的方法
  • EventStore 保存訂閱 Event 的 EventHandler
  • EventQueue 保存 Event 的隊列
  • EventPublisher 發布 Event
  • EventConsumer 處理 Event 隊列里的 Event
  • EventSubscriptionManager 管理訂閱 Event 的 EventHandler

實現代碼

EventBase 定義了基本事件信息,事件發生時間以及事件的id:

public abstract class EventBase
{
    [JsonProperty]
    public DateTimeOffset EventAt { get; private set; }

    [JsonProperty]
    public string EventId { get; private set; }

    protected EventBase()
    {
      this.EventId = GuidIdGenerator.Instance.NewId();
      this.EventAt = DateTimeOffset.UtcNow;
    }

    [JsonConstructor]
    public EventBase(string eventId, DateTimeOffset eventAt)
    {
      this.EventId = eventId;
      this.EventAt = eventAt;
    }
}

EventHandler 定義:

public interface IEventHandler
{
    Task Handle(IEventBase @event);
}

public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEventBase
{
    Task Handle(TEvent @event);
}

public class EventHandlerBase<TEvent> : IEventHandler<TEvent> where TEvent : EventBase
{
    public virtual Task Handle(TEvent @event)
    {
        return Task.CompletedTask;
    }

    public Task Handle(IEventBase @event)
    {
        return Handle(@event as TEvent);
    }
}

EventStore:

public class EventStore
{
    private readonly Dictionary<Type, Type> _eventHandlers = new Dictionary<Type, Type>();

    public void Add<TEvent, TEventHandler>() where TEventHandler : IEventHandler<TEvent> where TEvent : EventBase
    {
        _eventHandlers.Add(typeof(TEvent), typeof(TEventHandler));
    }

    public object GetEventHandler(Type eventType, IServiceProvider serviceProvider)
    {
        if (eventType == null || !_eventHandlers.TryGetValue(eventType, out var handlerType) || handlerType == null)
        {
            return null;
        }
        return serviceProvider.GetService(handlerType);
    }

    public object GetEventHandler(EventBase eventBase, IServiceProvider serviceProvider) =>
        GetEventHandler(eventBase.GetType(), serviceProvider);

    public object GetEventHandler<TEvent>(IServiceProvider serviceProvider) where TEvent : EventBase =>
        GetEventHandler(typeof(TEvent), serviceProvider);
}

EventQueue 定義:

public class EventQueue
{
    private readonly ConcurrentDictionary<string, ConcurrentQueue<EventBase>> _eventQueues =
        new ConcurrentDictionary<string, ConcurrentQueue<EventBase>>();

    public ICollection<string> Queues => _eventQueues.Keys;

    public void Enqueue<TEvent>(string queueName, TEvent @event) where TEvent : EventBase
    {
        var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
        queue.Enqueue(@event);
    }

    public bool TryDequeue(string queueName, out EventBase @event)
    {
        var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
        return queue.TryDequeue(out @event);
    }

    public bool TryRemoveQueue(string queueName)
    {
        return _eventQueues.TryRemove(queueName, out _);
    }

    public bool ContainsQueue(string queueName) => _eventQueues.ContainsKey(queueName);

    public ConcurrentQueue<EventBase> this[string queueName] => _eventQueues[queueName];
}

EventPublisher:

public interface IEventPublisher
{
    Task Publish<TEvent>(string queueName, TEvent @event)
        where TEvent : EventBase;
}
public class EventPublisher : IEventPublisher
{
    private readonly EventQueue _eventQueue;

    public EventPublisher(EventQueue eventQueue)
    {
        _eventQueue = eventQueue;
    }

    public Task Publish<TEvent>(string queueName, TEvent @event)
        where TEvent : EventBase
    {
        _eventQueue.Enqueue(queueName, @event);
        return Task.CompletedTask;
    }
}

EventSubscriptionManager:

public interface IEventSubscriptionManager
{
    void Subscribe<TEvent, TEventHandler>()
        where TEvent : EventBase
        where TEventHandler : IEventHandler<TEvent>;
}

public class EventSubscriptionManager : IEventSubscriptionManager
{
    private readonly EventStore _eventStore;

    public EventSubscriptionManager(EventStore eventStore)
    {
        _eventStore = eventStore;
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : EventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        _eventStore.Add<TEvent, TEventHandler>();
    }
}

EventConsumer:

public class EventConsumer : BackgroundService
{
    private readonly EventQueue _eventQueue;
    private readonly EventStore _eventStore;
    private readonly int maxSemaphoreCount = 256;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger _logger;

    public EventConsumer(EventQueue eventQueue, EventStore eventStore, IConfiguration configuration, ILogger<EventConsumer> logger, IServiceProvider serviceProvider)
    {
        _eventQueue = eventQueue;
        _eventStore = eventStore;
        _logger = logger;
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount, maxSemaphoreCount))
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var queues = _eventQueue.Queues;
                if (queues.Count > 0)
                {
                    await Task.WhenAll(
                    queues
                        .Select(async queueName =>
                        {
                            if (!_eventQueue.ContainsQueue(queueName))
                            {
                                return;
                            }
                            try
                            {
                                await semaphore.WaitAsync(stoppingToken);
                                //
                                if (_eventQueue.TryDequeue(queueName, out var @event))
                                {
                                    var eventHandler = _eventStore.GetEventHandler(@event, _serviceProvider);
                                    if (eventHandler is IEventHandler handler)
                                    {
                                        _logger.LogInformation(
                                            "handler {handlerType} begin to handle event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                            eventHandler.GetType().FullName, @event.GetType().FullName,
                                            @event.EventId, JsonConvert.SerializeObject(@event));

                                        try
                                        {
                                            await handler.Handle(@event);
                                        }
                                        catch (Exception e)
                                        {
                                            _logger.LogError(e, "event  {eventId}  handled exception", @event.EventId);
                                        }
                                        finally
                                        {
                                            _logger.LogInformation("event {eventId} handled", @event.EventId);
                                        }
                                    }
                                    else
                                    {
                                        _logger.LogWarning(
                                            "no event handler registered for event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                            @event.GetType().FullName, @event.EventId,
                                            JsonConvert.SerializeObject(@event));
                                    }
                                }
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "error running EventConsumer");
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        })
                );
                }

                await Task.Delay(50, stoppingToken);
            }
        }
    }
}

為了方便使用定義了一個 Event 擴展方法:

public static IServiceCollection AddEvent(this IServiceCollection services)
{
    services.TryAddSingleton<EventStore>();
    services.TryAddSingleton<EventQueue>();
    services.TryAddSingleton<IEventPublisher, EventPublisher>();
    services.TryAddSingleton<IEventSubscriptionManager, EventSubscriptionManager>();

    services.AddSingleton<IHostedService, EventConsumer>();
    return services;
}

使用示例

定義 PageViewEvent 記錄請求信息:

public class PageViewEvent : EventBase
{
    public string Path { get; set; }
}

這裏作為示例只記錄了請求的Path信息,實際使用可以增加更多需要記錄的信息

定義 PageViewEventHandler,處理 PageViewEvent

public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{
    private readonly ILogger _logger;

    public PageViewEventHandler(ILogger<PageViewEventHandler> logger)
    {
        _logger = logger;
    }

    public override Task Handle(PageViewEvent @event)
    {
        _logger.LogInformation($"handle pageViewEvent: {JsonConvert.SerializeObject(@event)}");
        return Task.CompletedTask;
    }
}

這個 handler 里什麼都沒做只是輸出一個日誌

這個示例項目定義了一個記錄請求路徑的事件以及一個發布請求記錄事件的中間件

// 發布 Event 的中間件
app.Use(async (context, next) =>
{
    var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
    await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
    await next();
});

Startup 配置:

public void ConfigureServices(IServiceCollection services)
{
    // ...
    services.AddEvent();
    services.AddSingleton<PageViewEventHandler>();// 註冊 Handler
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IEventSubscriptionManager eventSubscriptionManager)
{
    eventSubscriptionManager.Subscribe<PageViewEvent, PageViewEventHandler>();
    app.Use(async (context, next) =>
    {
        var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
        await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
        await next();
    });
    // ...
}

使用效果:

More

注:只是一個初步設計,基本可以實現功能,還是有些不足,實際應用的話還有一些要考慮的事情

  1. Consumer 消息邏輯,現在的實現有些問題,我們的應用場景目前比較簡單還可以滿足,如果事件比較多就會而且每個事件可能處理需要的時間長短不一樣,會導致在一個批次中執行的 Event 中已經完成的事件要等待其他還沒完成的事件完成之後才能繼續取下一個事件,理想的消費模式應該是各個隊列相互獨立,在同一個隊列中保持順序消費即可
  2. 上面示例的 EventStore 的實現只是簡單的實現了一個事件一個 Handler 的處理情況,實際業務場景中很可能會有一個事件需要多個 Handler 的情況
  3. 這個實現是基於內存的,如果要在分佈式場景下使用就不適用了,需要自己實現一下基於redis或者數據庫的以滿足分佈式的需求
  4. and more…

上面所有的代碼可以在 Github 上獲取,示例項目 Github 地址:

Reference

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

AutoCad 二次開發 文字鏡像

AutoCad 二次開發 文字鏡像

參考: 在autocad中如果使用Mirror命令把塊參照給鏡像了(最終得到一個對稱的塊),塊裏面的文字包括DBText和MText以及標註上面的文字都會被對稱,變得不易閱讀。而在單個字體實體和標註實體鏡像的時候只要設置系統變量mirrtext為0鏡像后的文字就不會與原文字對稱變成我們未學習過的文字了。   所以我們在鏡像塊的時候就可以先把塊炸開是用快捷鍵X,或者輸入explode,然後在使用鏡像命令。之後在把對稱后的實體集合組成一個新的塊。不過這樣操作十分的繁瑣,我覺得其中這樣做的優勢是mirror時的jig操作可以很方便的預先知道我們想要的對稱后的結果。但如果用代碼實現這種jig操作,我覺得有點複雜,目前我還不知道怎麼實現。   我要講的主要就是用代碼來實現塊的鏡像。難點就在與文字的鏡像,和標註的鏡像。這篇文章先講文字的鏡像。文字鏡像的主要步驟分為: 1.找到鏡像前文字邊界的四個角,這四個角構成了一個矩形,我們要求得這個矩形的長和寬所代表的向量。 2.判斷文字鏡像后的方向,如果是偏向朝Y軸鏡像,那麼文字鏡像后的方向是沿着X軸翻轉的,如果是偏向朝X軸鏡像,那麼文字鏡像后的方向是沿着X軸翻轉的。這裏我以沿着Y軸鏡像為例子。 3.移動鏡像后切被翻轉后的文字,這裏也是根據鏡像軸的不同,需按不同的向量來移動。   詳細情況見圖: 圖中左邊是要鏡像的文字,文字上的藍色線,和黃色線是我調試的時候加入的,黃線左端是 pt1,右端是pt2,藍線左端是pt3,右端是pt4。 中間的豎線是Y軸鏡像線,右邊就是不同情況下鏡像后的文字。其中黃色部分表示正確的鏡像結果,紅色部分表示:鏡像后延第一個步驟移動后求得的向量移動了文字的position但是沒翻轉的結果。黑色部分表示:鏡像后翻轉了文字但文字的position沒有按向量移動的結果。 下面我就來仔細分析一下代碼: 要實現第一步驟,前提是要有一段P/Invoke的代碼: 其中 引入的acdb22.dll是 autocad2018中的版本,不同版本,這個dll後面的数字不一樣。我們可以到cad安裝目錄下查找acdb幾個字,找到後面帶数字的就是了,64位的安裝目錄默認位置:C:\Program Files\Autodesk\AutoCAD 2018。這兩個函數一個是32位,一個是64位,具體用哪個後面的代碼會自動判斷。這個函數作用我覺得主要是求 這個name。   這裏用到了accore.dll,有的cad版本沒有這個dll,就用acad.exe代替就可以了。上面的acdbEntGet主要是根據entity的名字求的entity實體的Intptr,下面的函數時求的文字邊界對角點,這裏注意,我把這個兩個點用直線打印在cad空間里,發現它時在原點,沒旋轉的線,但其實文字不的position不在原點,也帶有旋轉角度。後面要求的文字邊界向量就是根據這兩個點來的。 上面求得的pt1,pt2 經過:
pt1 = pt1.TransformBy(rotMat).Add(dbText.Position.GetAsVector());
pt2 = pt2.TransformBy(rotMat).Add(dbText.Position.GetAsVector()); 這種操作就得到了第一幅圖中的黃線。 在經過這樣的操作,得到的pt3 和pt4就是第一幅圖的藍線。這其中的rotDir和linDir就是我們要求得的寬和長代表的向量了,然後在把它給鏡像了得到的mirRotDir和mirLinDir就是鏡像后的文字要移動的向量了,這裏第一步就結束了。 第二步,第三步:   大的話,就說明文字需要朝X軸翻轉,所以這裏的IsMirroredInX=true就代表需要朝X軸翻轉。 緊接着下面句,如果沒加mirLineDir這個向量,就會出現第一幅圖中的畫黑線的情況,如果不加IsMirrorInX就會出現畫紅線的情況。 到這裏就全部結束了。 下面給出所有代碼:

public class MyMirror
    {
        Document Doc = Application.DocumentManager.MdiActiveDocument;
        Editor Ed = Application.DocumentManager.MdiActiveDocument.Editor;
        Database Db = Application.DocumentManager.MdiActiveDocument.Database;

        List<Entity> list = new List<Entity>();
        List<ObjectId> listOId = new List<ObjectId>();

        [CommandMethod("testM")]

        public void MirrorTextCmd()

        {

            Document doc = Application.DocumentManager.MdiActiveDocument;

            Database db = doc.Database;

            Editor ed = doc.Editor;



            //Entity selection

            PromptEntityOptions peo = new PromptEntityOptions(

                "\nSelect a text entity:");



            peo.SetRejectMessage("\nMust be text entity...");

            peo.AddAllowedClass(typeof(DBText), true);



            PromptEntityResult perText = ed.GetEntity(peo);



            if (perText.Status != PromptStatus.OK)

                return;



            peo = new PromptEntityOptions("\nSelect a mirror line:");

            peo.SetRejectMessage("\nMust be a line entity...");

            peo.AddAllowedClass(typeof(Line), true);



            PromptEntityResult perLine = ed.GetEntity(peo);



            if (perLine.Status != PromptStatus.OK)

                return;



            using (Transaction tr = db.TransactionManager.StartTransaction())

            {

                Line line = tr.GetObject(perLine.ObjectId, OpenMode.ForRead)

                    as Line;



                Line3d mirrorLine = new Line3d(

                    line.StartPoint,

                    line.EndPoint);



                MirrorText(perText.ObjectId, mirrorLine);



                tr.Commit();

            }

        }



        void MirrorText(ObjectId oId, Line3d mirrorLine)

        {

            Database db = oId.Database;



            using (Transaction tr = db.TransactionManager.StartTransaction())

            {

                // Get text entity

                DBText dbText = tr.GetObject(oId, OpenMode.ForRead)

                    as DBText;



                // Clone original entity

                DBText mirroredTxt = dbText.Clone() as DBText;



                // Create a mirror matrix

                Matrix3d mirrorMatrix = Matrix3d.Mirroring(mirrorLine);



                // Do a geometric mirror on the cloned text

                mirroredTxt.TransformBy(mirrorMatrix);



                // Get text bounding box

                Point3d pt1, pt2, pt3, pt4;

                GetTextBoxCorners(

                    dbText,

                    out pt1,

                    out pt2,

                    out pt3,

                    out pt4);



                // Get the perpendicular direction to the original text

                Vector3d rotDir =

                    pt4.Subtract(pt1.GetAsVector()).GetAsVector();



                // Get the colinear direction to the original text

                Vector3d linDir =

                    pt3.Subtract(pt1.GetAsVector()).GetAsVector();



                // Compute mirrored directions

                Vector3d mirRotDir = rotDir.TransformBy(mirrorMatrix);

                Vector3d mirLinDir = linDir.TransformBy(mirrorMatrix);



                //Check if we need to mirror in Y or in X

                if (Math.Abs(mirrorLine.Direction.Y) >

                    Math.Abs(mirrorLine.Direction.X))

                {

                    // Handle the case where text is mirrored twice

                    // instead of doing "oMirroredTxt.IsMirroredInX = true"

                    mirroredTxt.IsMirroredInX = !mirroredTxt.IsMirroredInX;

                    mirroredTxt.Position = mirroredTxt.Position + mirLinDir;

                }

                else

                {

                    mirroredTxt.IsMirroredInY = !mirroredTxt.IsMirroredInY;

                    mirroredTxt.Position = mirroredTxt.Position + mirRotDir;

                }



                // Add mirrored text to database

                //btr.AppendEntity(mirroredTxt);

                //tr.AddNewlyCreatedDBObject(mirroredTxt, true);

                //list.Add(mirroredTxt);
                mirroredTxt.ToSpace();
                tr.Commit();

            }

        }
        #region p/Invoke


        public struct ads_name
        {

            public IntPtr a;

            public IntPtr b;

        };



        // Exported function names valid only for R19



        [DllImport("acdb22.dll",

            CallingConvention = CallingConvention.Cdecl,

            EntryPoint = "?acdbGetAdsName@@YA?AW4ErrorStatus@Acad@@AAY01JVAcDbObjectId@@@Z")]

        public static extern int acdbGetAdsName32(

            ref ads_name name,

            ObjectId objId);



        [DllImport("acdb22.dll",

            CallingConvention = CallingConvention.Cdecl,

            EntryPoint = "?acdbGetAdsName@@YA?AW4ErrorStatus@Acad@@AEAY01_JVAcDbObjectId@@@Z")]

        public static extern int acdbGetAdsName64(

            ref ads_name name,

            ObjectId objId);



        public static int acdbGetAdsName(ref ads_name name, ObjectId objId)

        {

            if (Marshal.SizeOf(IntPtr.Zero) > 4)

                return acdbGetAdsName64(ref name, objId);



            return acdbGetAdsName32(ref name, objId);

        }



        [DllImport("accore.dll",

            CharSet = CharSet.Unicode,

            CallingConvention = CallingConvention.Cdecl,

            EntryPoint = "acdbEntGet")]

        public static extern System.IntPtr acdbEntGet(

            ref ads_name ename);



        [DllImport("accore.dll",

            CharSet = CharSet.Unicode,

            CallingConvention = CallingConvention.Cdecl,

            EntryPoint = "acedTextBox")]

        public static extern System.IntPtr acedTextBox(

            IntPtr rb,

            double[] point1,

            double[] point2);



        void GetTextBoxCorners(DBText dbText, out Point3d pt1, out Point3d pt2, out Point3d pt3, out Point3d pt4)

        {

            ads_name name = new ads_name();



            int result = acdbGetAdsName(

                ref name,

                dbText.ObjectId);



            ResultBuffer rb = new ResultBuffer();



            Interop.AttachUnmanagedObject(

                rb,

                acdbEntGet(ref name), true);



            double[] point1 = new double[3];

            double[] point2 = new double[3];



            // Call imported arx function

            acedTextBox(rb.UnmanagedObject, point1, point2);



            pt1 = new Point3d(point1);

            pt2 = new Point3d(point2);

            var ptX = pt1 + Vector3d.XAxis * 40;
            var ptY = pt2 + Vector3d.YAxis * 50;


            var lX = new Line(pt1, ptX);
            var lY = new Line(pt2, ptY);

            lX.Color= Color.FromColor(System.Drawing.Color.Green);
            lY.Color= Color.FromColor(System.Drawing.Color.Orange);


            Line line = new Line(pt1, pt2);

            line.Color = Color.FromColor(System.Drawing.Color.Red);

            line.ToSpace();
            lX.ToSpace();
            lY.ToSpace();

            // Create rotation matrix

            Matrix3d rotMat = Matrix3d.Rotation(

                dbText.Rotation,

                dbText.Normal,

                pt1);



            // The returned points from acedTextBox need

            // to be transformed as follow

            pt1 = pt1.TransformBy(rotMat).Add(dbText.Position.GetAsVector());

            pt2 = pt2.TransformBy(rotMat).Add(dbText.Position.GetAsVector());

            Line linetrans = new Line(pt1, pt2);

            linetrans.Color = Color.FromColor(System.Drawing.Color.Yellow) ;

            linetrans.ToSpace();


            Vector3d rotDir = new Vector3d(

                -Math.Sin(dbText.Rotation),

                Math.Cos(dbText.Rotation), 0);


            //求垂直於rotDir和normal的法向量
            Vector3d linDir = rotDir.CrossProduct(dbText.Normal);



            double actualWidth =

                Math.Abs((pt2.GetAsVector() - pt1.GetAsVector())

                    .DotProduct(linDir));



            pt3 = pt1.Add(linDir * actualWidth);

            pt4 = pt2.Subtract(linDir * actualWidth);

            Line linetrans2 = new Line(pt3, pt4);

            linetrans2.Color = Color.FromColor(System.Drawing.Color.Blue);

            linetrans2.ToSpace();
        }

        #endregion
    }

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

.NET進階篇06-async異步、thread多線程3

知識需要不斷積累、總結和沉澱,思考和寫作是成長的催化劑

梯子

一、任務Task

System.Threading.Tasks在.NET4引入,前麵線程的API太多了,控制不方便,而ThreadPool控制能力又太弱,比如做線程的延續、阻塞、取消、超時等功能不太方便,所以Task就抽象了線程功能,在後台使用ThreadPool

1、啟動任務

可以使用TaskFactory類或Task類的構造函數和Start()方法,委託可以提供帶有一個Object類型的輸入參數,所以可以給任務傳遞任意數據,還漏了一個常用的Task.Run

TaskFactory taskFactory = new TaskFactory();
taskFactory.StartNew(() => 
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
Task.Factory.StartNew(() =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
Task task = new Task(() =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.Start();

只有Task類實例方式需要Start()去啟動任務,當然可以RunSynchronously()來同步執行任務,主線程會等待,就是用主線程來執行這個task任務

Task task = new Task(() =>
{
    Thread.Sleep(10000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.RunSynchronously();

2、阻塞延續

在Thread中我們使用join來阻塞等待,在多個Thread時進行控制就不太方便。Task中我們使用實例方法Wait阻塞單個任務或靜態方法WaitAll和WaitAny阻塞多個任務

var task = new Task(() =>
{
    Thread.Sleep(5*1000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
var task2 = new Task(() =>
{
    Thread.Sleep(10 * 1000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.Start();
task2.Start();
//task.Wait();//單任務等待
//Task.WaitAny(task, task2);//任何一個任務完成就繼續
Task.WaitAll(task, task2);//任務都完成才繼續

如果不希望阻塞主線程,實現當一個任務或幾個任務完成后執行別的任務,可以使用Task靜態方法WhenAll和WhenAny,他們將返回一個Task,但這個Task不允許你控制,將會在滿足WhenAll和WhenAny里任務完成時自動完成,然後調用Task的ContinueWith方法,就可以在一個任務完成后緊跟開始另一個任務

Task.WhenAll(task, task2).ContinueWith((t) =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});

Task.Factory工廠中也存在類似ContinueWhenAll和ContinueWhenAny

3、任務層次結構

不僅可以在一個任務結束后執行另一個任務,也可以在一個任務內啟動一個任務,這就啟動了一個父子層次結構

var parentTask = new Task(()=> 
{
    Console.WriteLine($"parentId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    Thread.Sleep(5*1000);
    var childTask = new Task(() =>
    {
        Thread.Sleep(10 * 1000);
        Console.WriteLine($"childId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}")
    });
    childTask.Start();
});
parentTask.Start();

如果父任務在子任務之前結束,父任務的狀態為WaitingForChildrenToComplete,當子任務也完成時,父任務的狀態就變為RanToCompletion,當然,在創建任務時指定TaskCreationOptions枚舉參數,可以控制任務的創建和執行的可選行為

4、枚舉參數

簡單介紹下創建任務中的TaskCreationOptions枚舉參數,創建任務時我們可以提供TaskCreationOptions枚舉參數,用於控制任務的創建和執行的可選行為的標誌

  1. AttachedToParent:指定將任務附加到任務層次結構中的某個父級,意思就是建立父子關係,父任務必須等待子任務完成才可以繼續執行。和WaitAll效果一樣。上面例子如果在創建子任務時指定TaskCreationOptions.AttachedToParent,那麼父任務wait時也會等子任務的結束
  2. DenyChildAttach:不讓子任務附加到父任務上
  3. LongRunning:指定是長時間運行任務,如果事先知道這個任務會耗時比較長,建議設置此項。這樣,Task調度器會創建Thread線程,而不使用ThreadPool線程。因為你長時間佔用ThreadPool線程不還,那它可能必要時會在線程池中開啟新的線程,造成調度壓力
  4. PreferFairness:盡可能公平的安排任務,這意味着較早安排的任務將更可能較早運行,而較晚安排運行的任務將更可能較晚運行。實際通過把任務放到線程池的全局隊列中,讓工作線程去爭搶,默認是在本地隊列中。

另一個枚舉參數是ContinueWith方法中的TaskContinuationOptions枚舉參數,它除了擁有幾個和上面同樣功能的枚舉值外,還擁有控制任務的取消延續等功能

  1. LazyCancellation:在延續取消的情況下,防止延續的完成直到完成先前的任務。什麼意思呢?
CancellationTokenSource source = new CancellationTokenSource();
source.Cancel();
var task1 = new Task(() => 
{
    Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
var task2 = task1.ContinueWith(t =>
{
    Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
},source.Token);
var task3 = task2.ContinueWith(t =>
{
    Console.WriteLine($"task3 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task1.Start();

上面例子我們企圖task1->task2->task3順序執行,然後通過CancellationToken來取消task2的執行。結果會是怎樣呢?結果task1和task3會并行執行(task3也是會執行的,而且是和task1并行,等於原來的一條鏈變成了兩條鏈),然後我們嘗試使用LazyCancellation,

var task2 = task1.ContinueWith(t =>
{
    Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
},source.Token,TaskContinuationOptions.LazyCancellation,TaskScheduler.Current);

這樣,將會在task1執行完成后,task2才去判斷source.Token,為Cancel就不執行,接下來執行task3就保證了原來的順序

  1. ExecuteSynchronously:指定應同步執行延續任務,比如上例中,在延續任務task2中指定此參數,則task2會使用執行task1的線程來執行,這樣防止線程切換,可以做些共有資源的訪問。不指定的話就隨機,但也能也用到task1的線程
  2. NotOnRanToCompletion:延續任務必須在前面任務非完成狀態下執行
  3. OnlyOnRanToCompletion:延續任務必須在前面任務完成狀態才能執行
  4. NotOnFaulted,OnlyOnCanceled,OnlyOnFaulted等等

5、任務取消

在上篇使用Thread時,我們使用一個變量isStop標記是否取消任務,這種訪問共享變量的方式難免會出問題。task中提出CancellationTokenSource類專門處理任務取消,常見用法看下面代碼註釋

CancellationTokenSource source = new CancellationTokenSource();//構造函數中也可指定延遲取消
//註冊一個取消時調用的委託
source.Token.Register(() =>
{
    Console.WriteLine("當前source已經取消,可以在這裏做一些其他事情(比如資源清理)...");
});
var task1 = new Task(() => 
{
    while (!source.IsCancellationRequested)
    {
        Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    }
},source.Token);
task1.Start();
//source.Cancel();//取消
source.CancelAfter(1000);//延時取消

6、任務結果

讓子線程返回結果,可以將信息寫入到線程安全的共享變量中去,或則使用可以返回結果的任務。使用Task的泛型版本Task<TResult>,就可以定義返回結果的任務。Task是繼承自Task的,Result獲取結果時是要阻塞等待直到任務完成返回結果的,內部判斷沒有完成則wait。通過TaskStatus屬性可獲得此任務的狀態是啟動、運行、異常還是取消等

var task = new Task<string>(() =>
{
     return "hello ketty";
});
task.Start();
string result = task.Result;

7、異常

可以使用AggregateException來接受任務中的異常信息,這是一個聚合異常繼承自Exception,可以遍歷獲取包含的所有異常,以及進行異常處理,決定是否繼續往上拋異常等

var task = Task.Factory.StartNew(() =>
{
    var childTask1 = Task.Factory.StartNew(() =>
    {
        throw new Exception("childTask1異常...");
    },TaskCreationOptions.AttachedToParent);
    var childTask12= Task.Factory.StartNew(() =>
    {
        throw new Exception("childTask2異常...");
    }, TaskCreationOptions.AttachedToParent);
});
try
{
    try
    {
        task.Wait();
    }
    catch (AggregateException ex)
    {
        foreach (var item in ex.InnerExceptions)
        {
            Console.WriteLine($"message{item.InnerException.Message}");
        }
        ex.Handle(x =>
        {
            if (x.InnerException.Message == "childTask1異常...")
            {
                return true;//異常被處理,不繼續往上拋了
            }
            return false;
        });
    }
}
catch (Exception ex)
{
    throw;
}

二、并行Parallel

1、Parallel.For()、Parallel.ForEach()

在.NET4中,另一個新增的抽象的線程時Parallel類。這個類定義了并行的for和foreach的靜態方法。Parallel.For()和Parallel.ForEach()方法多次調用一個方法,而Parallel.Invoke()方法允許同時調用不同的方法。首先Parallel是會阻塞主線程的,它將讓主線程也參与到任務中
Parallel.For()類似於for允許語句,并行迭代同一個方法,迭代順序沒有保證的

ParallelLoopResult result = Parallel.For(010, i =>
{
    Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine(result.IsCompleted);

也可以提前中斷Parallel.For()方法。For()方法的一個重載版本接受Action<int,parallelloopstate style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”>類型參數。一般不使用,像下面這樣,本想大於5就停止,但實際也可能有大於5的任務已經在跑了。可以通過ParallelOptions傳入允許最大線程數以及取消Token等

ParallelLoopResult result = Parallel.For(010new ParallelOptions() { MaxDegreeOfParallelism = 8 },(i,loop) =>
{
    Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
    if (i > 5)
    {
        loop.Break();
    }
});

2、Parallel.For<TLocal>

For還有一個高級泛型版本,相當於并行的聚合計算

ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopStateTLocalTLocal> body, Action<TLocal> localFinally);

像下面這樣我們求0…100的和,第三個參數更定一個種子初始值,第四個參數迭代累計,最後聚合

int totalNum = 0;
Parallel.For<int>(0100() => { return 0; }, (current, loop, total) =>
{
    total += current;
    return total;
}, (total) =>
{
    Interlocked.Add(ref totalNum, total);
});

上面For用來處理數組數據,ForEach()方法用來處理非數組的數據任務,比如字典數據繼承自IEnumerable的集合等

3、Parallel.Invoke()

Parallel.Invoke()則可以并行調用不同的方法,參數傳遞一個Action的委託數組

Parallel.Invoke(() => { Console.WriteLine($"方法1 thread:{Thread.CurrentThread.ManagedThreadId}"); }
    , () => { Console.WriteLine($"方法2 thread:{Thread.CurrentThread.ManagedThreadId}"); }
    , () => { Console.WriteLine($"方法3 thread:{Thread.CurrentThread.ManagedThreadId}"); });

4、PLinq

Plinq,為了能夠達到最大的靈活度,linq有了并行版本。使用也很簡單,只需要將原始集合AsParallel就轉換為支持并行化的查詢。也可以AsOrdered來順序執行,取消Token,強制并行等

var nums = Enumerable.Range(0100);
var query = from n in nums.AsParallel()
            select new
            {
                thread=$"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}"
            };

三、異步等待AsyncAwait

異步編程模型,可能還需要大篇幅來學習,這裏先介紹下基本用法,內在本質需要用ILSpy反編譯來看,以後可能要分專題總結。文末先給幾個參考資料,有興趣自己闊以先琢磨琢磨鴨

1、簡單使用

這是.NET4.5開始提供的一對語法糖,使得可以較簡便的使用異步編程。async用在方法定義前面,await只能寫在帶有async標記的方法中,任何方法都可以增加async,一般成對出現,只有async沒有意義,只有await會報錯,請先看下面的示例

private static async void AsyncTest()
{
    //主線程執行
    Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    TaskFactory taskFactory = new TaskFactory();
    Task task = taskFactory.StartNew(() =>
    {
        Thread.Sleep(3000);
        Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
    });
    await task;//主線程到這裏就返回了,執行主線程任務
    //子線程執行,其實是封裝成委託,在task之後成為回調(編譯器功能  狀態機實現) 後面相當於task.ContinueWith()
    //這個回調的線程是不確定的:可能是主線程  可能是子線程  也可能是其他線程,在winform中是主線程
    Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
}

一般使用async都會讓方法返回一個Task的,像下面這樣複雜一點的

private static async Task<stringAsyncTest2()
{
    Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    TaskFactory taskFactory = new TaskFactory();
    string x = await taskFactory.StartNew(() =>
      {
          Thread.Sleep(3000);
          Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
          return "task over";
      });

    Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    return x;
}

通過var reslult = AsyncTest2().Result;調用即可。但注意如果調用Wait或Result的代碼位於UI線程,Task的實際執行在其他線程,其需要返回UI線程則會造成死鎖,所以應該Async all the way

2、優雅

從上面簡單示例中可以看出異步編程的執行邏輯:主線程A邏輯->異步任務線程B邏輯->主線程C邏輯
異步方法的返回類型只能是void、Task、Task。示例中異步方法的返回值類型是Task,通常void也不推薦使用,沒有返回值直接用Task就是

上一篇也大概了解到如果我們要在任務中更新UI,需要調用Invoke通知UI線程來更新,代碼看起來像下面這樣,在一個任務後去更新UI

private void button1_Click(object sender, EventArgs e)
{
    var ResultTask = Task.Run(() => {
        Thread.Sleep(5000);
        return "任務完成";
    });
    ResultTask.ContinueWith((r)=> 
    {
        textBox1.Invoke(() => {
            textBox1.Text = r.Result;
        });
    });
}

如果使用async/await會看起來像這樣,是不是優雅了許多。以看似同步編程的方式實現異步

private async void button1_Click(object sender, EventArgs e)
{
    var t = Task.Run(() => {
        Thread.Sleep(5000);
        return "任務完成";
    });
    textBox1.Text = await t;
}

3、最後

在.NET 4.5中引入的Async和Await兩個新的關鍵字后,用戶能以一種簡潔直觀的方式實現異步編程。甚至都不需要改變代碼的邏輯結構,就能將原來的同步函數改造為異步函數。
在內部實現上,Async和Await這兩個關鍵字由編譯器轉換為狀態機,通過System.Threading.Tasks中的并行類實現代碼的異步執行。

字數有點多了,我的能力也就高考作文800字能寫的出奇好。看了很多異步編程,腦袋有點炸,等消化后再輸出一次,技藝不足,只能用輸出倒逼輸入了,下一篇會是線程安全集合、鎖問題、同步問題,基於事件的異步模式等

Search the fucking web
Read the fucking maunal

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

HttpClient在高併發場景下的優化實戰

在項目中使用HttpClient可能是很普遍,尤其在當下微服務大火形勢下,如果服務之間是http調用就少不了跟http客戶端找交道.由於項目用戶規模不同以及應用場景不同,很多時候可能不需要特別處理也.然而在一些高併發場景下必須要做一些優化.

項目是快遞公司的快件軌跡查詢項目,目前平均每小時調用量千萬級別.軌跡查詢以Oracle為主要數據源,Mongodb為備用,當Oracle不可用時,數據源切換到Mongodb.今年菜鳥團隊加入后,主要數據遷移到了阿里雲上,以Hbase為主要存儲.其中Hbase數據查詢服務由數據解析組以Http方式提供.原有Mongodb棄用,雲上數據源變為主數據源,Oracle作為備用.當數據源切換以後,主要的調用方式也就變成了http方式.在第10月初第一輪雙11壓測試跑上,qps不達標.當然這個問題很好定位,因為十一假之間軌跡域組內已經進行過試跑,當時查的是oracle.十一假期回來后,只有這一處明顯的改動,很容易定位到問題出現在調用上.但具體是雲上Hbase慢,還是網絡傳輸問題(Hbase是阿里雲上的服務,軌跡查詢項目部署在IDC機房).通過雲服務,解析組和網絡運維的配合,確定問題出現在應用程序上.在Http服務調用處打日誌記錄,發現以下問題:

可以看到每隔一段時間,就會有不少請求的耗時明顯比其它的要高.

導致這種情況可能可能是HttpClient反覆創建銷毀造成引起來銷,首先憑經驗可能是對HttpClient進行了Dispose操作(Using(HttpClient client=new HttpClient){…})

如果你裝了一些第三方插件,當你寫的HttpClient沒有被Using包圍的時候會給出重構建議,建議加上Using或者手動dispose.然而實際中是否要dispose還要視情況而定,對於一般項目大家的感覺可能是不加也沒有大問題,加了也還ok.但是實際項目中,一般不建議反覆重新創建這個對象,關於HttpClient是否需要Dispose請看

在對這個問題的答案里,提問者指出微軟的一些示例也是沒有使用using的.下面一個比較熱的回答指出HttpClient生命周期應該和應用程序生命周期一致,只要應用程序需要Http請求,就不應用把它Dispose掉.下面的一個仍然相對比較熱的回答指出一般地,帶有Dispose方法的對象都應當被dispose掉,但是HttpClient是一個例外.

當然以上只是結合自己的經驗對一個大家都可能比較困惑的問題給出一個建議,實際上對於一般的項目,用還是不用Dispose都不會造成很大問題.本文中上面提到的問題跟HttpClient也沒有關係,因為程序中使用的Http客戶端是基於HttpWebRequest封裝的.

問題排查及優化

經過查詢相關資料以及同行的經驗分享(給了很大啟發)

查看代碼,request.KeepAlive = true;查詢,這個屬性其實是設置一個’Keep-alive’請求header,當時同事封裝Http客戶端的場景現場無從得知,然而對於本文中提到的場景,由於每次請求的都是同一個接口,因此保持持續連接顯然能夠減少反覆創建tcp連接的開銷.因此註釋掉這一行再發布測試,以上問題便不復出現了!

當然實際中做的優化絕不僅僅是這一點,如果僅僅是這樣,一句話就能夠說完了,大家都記住以後就這樣做就Ok了.實際上還參考了不少大家在實際項目中的經驗或者坑.下面把整個HttpClient代碼貼出來,下面再對關鍵部分進行說明.

 public static string Request(string requestUrl, string requestData, out bool isSuccess, string contentType = "application/x-www-form-urlencoded;charset=utf8")
    {
        string apiResult = "";
        isSuccess = false;
        if (string.IsNullOrEmpty(requestData))
        {
            return apiResult;
        }
        HttpWebRequest request = null;
        HttpWebResponse response = null;
        try
        {
            byte[] buffer = Encoding.UTF8.GetBytes(requestData);
            request = WebRequest.Create($"{requestUrl}") as HttpWebRequest;
            request.ContentType = "application/json";
            request.Method = "POST";
            request.ContentLength = buffer.Length;
            request.Timeout =200;
            request.ReadWriteTimeout =  Const.HttpClientReadWriteTimeout
            request.ServicePoint.Expect100Continue = false;
            request.ServicePoint.UseNagleAlgorithm = false;
            request.ServicePoint.ConnectionLimit = 2000
            request.AllowWriteStreamBuffering = false;
            request.Proxy = null;

            using (var stream = request.GetRequestStream())
            {
                stream.Write(buffer, 0, buffer.Length);
            }
            using (response = (HttpWebResponse)request.GetResponse())
            {
                string encoding = response.ContentEncoding;
                using (var stream = response.GetResponseStream())
                {
                    if (string.IsNullOrEmpty(encoding) || encoding.Length < 1)
                    {
                        encoding = "UTF-8"; //默認編碼
                    }

                    if (stream != null)
                    {
                        using (StreamReader reader = new StreamReader(stream, Encoding.GetEncoding(encoding)))
                        {
                            apiResult = reader.ReadToEnd();
                            //byte[] bty = stream.ReadBytes();
                            //apiResult = Encoding.UTF8.GetString(bty);
                        }
                    }
                    else
                    {
                        throw new Exception("響應流為null!");
                    }
                }
            }
            isSuccess = true;
        }
        catch (Exception err)
        {
            isSuccess = false;
            LogUtilities.WriteException(err);
        }
        finally
        {
            response?.Close();
            request?.Abort();
        }

        return apiResult;
    }
  • 首先是TimeOut問題,不僅僅是在高併發場景下,實際項目中建議不管是任何場景都要設置它的值.在HttpWebRequest對象中,它的默認值是100000毫秒,也就是100秒.如果服務端出現問題,默認設置將會造成嚴重阻塞,對於普通項目也會嚴重影響用戶體驗.返回失敗讓用戶重試也比這樣長時間等待體驗要好.

  • ReadWriteTimeout很多朋友可能沒有接觸過這個屬性,尤其是使用.net 4.5里HttpClient對象的朋友.有過Socket編程經驗的朋友可能會知道,socket連接有連接超時時間和傳輸超時時間,這裏的ReadWriteTimeout類似於Socket編程里的傳輸超時時間.從字面意思上看,就是讀寫超時時間,防止數據量過大或者網絡問題導致流傳入很長時間都無法完成.當然在一般場景下大家可以完全不理會它,如果由於網絡原因造成讀寫超時也很有可能造成連接超時.這裏之所以設置這個值是由於實際業務場景決定的.大家可能已經看到,以上代碼對於ReadWriteTimeout的設置並不像Timeout一樣設置為一個固定值,而是放在了一個Const類中,它實際上是讀取一個配置,根據配置動態決定值的大小.實際中的場景是這樣的,由於壓測環境無法完全模擬真實的用戶訪問場景.壓測的時候都是使用單個單號進行軌跡查詢,但是實際業務中允許用戶一次請求查詢最多多達數百個單號.一個單號的軌跡記錄一般都是幾十KB大小,如果請求的單號數量過多數量量就會極大增加長,查詢時間和傳輸時間都會極大增加,為了保證雙11期間大多數用戶能正常訪問,必要時會把這個時間設置的很小(默認是3000毫秒),讓單次查詢量大的用戶快速失敗.

以上只是一種備用方案,不得不承認,既然系統允許一次查詢多個單號,因此在用戶在沒有達到上限之前所有的請求都是合法的,也是應該予以支持的,因此以上做法實際上有損用戶體驗的,然而系統的資源是有限的,要必要的時候只能犧牲特殊用戶的利益,保證絕大多數用戶的利益.雙11已經渡過,實際中雙11也沒有改動以上配置的值,但是做為風險防範增加動態配置是必要的.

這裏再多差一下嘴,就是關於ContentLength它的值是可以不設置的,不設置時程序會自動計算,但是設置的時候一定要設置字節數組的長度,而不是字符串的長度,因為包含中文時,根據編碼規則的不同,中文字符可能佔用兩個字節或者更長字節長度.

  • 關於 request.ServicePoint.Expect100Continue = false; request.ServicePoint.UseNagleAlgorithm = false;這兩項筆者也不是特別清楚,看了相關文檔也沒有特別明白,還請了解的朋友指點,大家共同學習進步.

  • request.ServicePoint.ConnectionLimit = 2000是設置最大的連接數,不少朋友是把這個數值設置為65536,實際上單台服務器web併發連接遠太不到這個數值.這裏根據項目的實際情況,設置為2000.以防止處理能力不足時,請求隊列過長.

  • request.AllowWriteStreamBuffering = false;根據[微軟文檔()]這個選項設置為true時,數據將緩衝到內存中,以便在重定向或身份驗證請求時可以重新發送數據.

最為重要的是,文檔中說將 AllowWriteStreamBuffering 設置為 true 可能會在上傳大型數據集時導致性能問題,因為數據緩衝區可能會使用所有可用內存。由於發送的請求僅僅是單號,數據量很小,並且很少有用戶一個單號反覆查詢的需求.加上可能會有副作用.這裏設置為false.

  • request.Proxy = null;這裡是參考了一個一位網友的文章,裏面提到默認的Proxy導致超時怪異行為.由於解決問題是在10月份,據寫這篇文章已經有一段時間了,因此再尋找時便找不到這篇文章了.有興趣的朋友可以自己搜索一下.

很多朋友可能會關心,通過以上配置到底有沒有解決問題.實際中以上配置后已經經歷了雙11峰值qps過萬的考驗.下面給出寫本文時候請求耗時的監控

可以看到,整體上請求耗時比較平穩.

可能看了這個圖,有些朋友還是會有疑問,通過上面日誌截圖可以看到,除了耗時在100ms以上的請求外,普通的耗時在四五十毫秒的還是有很多的,但是下面這個截圖裡都是在10到20區間浮動,最高的也就30ms.這其實是由於在壓測的過程中,發現Hbase本身也有不穩定的因素(大部分請求響應耗時都很平穩,但是偶爾會有個別請求婁千甚至數萬毫秒(在監控圖上表現為一個很突兀的線,一般習慣稱為毛刺),這在高併發場景下是不能接受的,問題反饋以後阿里雲對Hbase進行了優化,優化以後耗時也有所下降.)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

Kafka冪等性原理及實現剖析

1.概述

最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。

2.內容

2.1 Kafka為啥需要冪等性?

Producer在生產發送消息時,難免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性后,重複發送只會生成一條有效的消息。Kafka作為分佈式消息系統,它的使用場景常見與分佈式系統中,比如消息推送系統、業務平台系統(如物流平台、銀行結算平台等)。以銀行結算平台來說,業務方作為上游把數據上報到銀行結算平台,如果一份數據被計算、處理多次,那麼產生的影響會很嚴重。

2.2 影響Kafka冪等性的因素有哪些?

在使用Kafka時,需要確保Exactly-Once語義。分佈式系統中,一些不可控因素有很多,比如網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重複發送。可能出現的情況如下:

 

 

2.3 Kafka的冪等性是如何實現的?

Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?

  • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
  • SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

2.3.1 冪等性引入之前的問題?

Kafka在引入冪等性之前,Producer向Broker發送消息,然後Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

 

上圖的實現流程是一種理想狀態下的消息發送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

 

上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。

2.3.2 冪等性引入之後解決了什麼問題?

面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面我們可以先來看看流程圖:

 

 同樣,這是一種理想狀態下的發送流程。實際情況下,會有很多不確定的因素,比如Broker在發送Ack信號給Producer時出現網絡異常,導致發送失敗。異常情況如下圖所示:

 

 當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的情況。

2.3.3 ProducerID是如何生成的?

客戶端在生成Producer時,會實例化如下代碼:

// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼如下:

 private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事務

與冪等性有關的另外一個特性就是事務。Kafka中的事務與數據庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結果是同時成功或者同時失敗。

這裏需要與數據庫中事務進行區別,操作數據庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產和消費等原子性操作。

3.1 Kafka引入事務的用途?

在事務屬性引入之前,先引入Producer的冪等性,它的作用為:

  • Producer多次發送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;
  • 消費者&生產者模式下,因為Consumer在Commit Offsets出現問題時,導致重複消費消息時,Producer重複生產消息。需要將這個模式下Consumer的Commit Offsets操作和Producer一系列生產消息的操作封裝成一個原子性操作。

產生的場景有:

比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那麼執行觸發Balance時,其他Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。

3.2 事務提供了哪些可使用的API?

Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:

// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();

// 開啟事務
void beginTransaction() throws ProducerFencedException;

// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事務
void commitTransaction() throws ProducerFencedException;

// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;

3.3 事務的實際應用場景有哪些?

在Kafka事務中,一個原子性操作,根據操作類型可以分為3種情況。情況如下:

  • 只有Producer生產消息,這種場景需要事務的介入;
  • 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
  • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。

4.總結

Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性類似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。

5.結束語

這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《》和《》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

【原創】(十一)Linux內存管理slub分配器

背景

  • Read the fucking source code! –By 魯迅
  • A picture is worth a thousand words. –By 高爾基

說明:

  1. Kernel版本:4.14
  2. ARM64處理器,Contex-A53,雙核
  3. 使用工具:Source Insight 3.5, Visio

1. 概述

之前的文章分析的都是基於頁面的內存分配,而小塊內存的分配和管理是通過塊分配器來實現的。目前內核中,有三種方式來實現小塊內存分配:slab, slub, slob,最先有slab分配器,slub/slob分配器是改進版,slob分配器適用於小內存嵌入式設備,而slub分配器目前已逐漸成為主流塊分配器。接下來的文章,就是以slub分配器為目標,進一步深入。

先來一個初印象:

2. 數據結構

有四個關鍵的數據結構:

  • struct kmem_cache:用於管理SLAB緩存,包括該緩存中對象的信息描述,per-CPU/Node管理slab頁面等;
    關鍵字段如下:
/*
 * Slab cache management.
 */
struct kmem_cache {
    struct kmem_cache_cpu __percpu *cpu_slab;       //每個CPU slab頁面
    /* Used for retriving partial slabs etc */
    unsigned long flags;
    unsigned long min_partial;
    int size;       /* The size of an object including meta data */
    int object_size;    /* The size of an object without meta data */
    int offset;     /* Free pointer offset. */
#ifdef CONFIG_SLUB_CPU_PARTIAL
    /* Number of per cpu partial objects to keep around */
    unsigned int cpu_partial;
#endif
    struct kmem_cache_order_objects oo;     //該結構體會描述申請頁面的order值,以及object的個數

    /* Allocation and freeing of slabs */
    struct kmem_cache_order_objects max;
    struct kmem_cache_order_objects min;
    gfp_t allocflags;   /* gfp flags to use on each alloc */
    int refcount;       /* Refcount for slab cache destroy */
    void (*ctor)(void *);           // 對象構造函數
    int inuse;      /* Offset to metadata */
    int align;      /* Alignment */
    int reserved;       /* Reserved bytes at the end of slabs */
    int red_left_pad;   /* Left redzone padding size */
    const char *name;   /* Name (only for display!) */
    struct list_head list;  /* List of slab caches */       //kmem_cache最終會鏈接在一個全局鏈表中
    struct kmem_cache_node *node[MAX_NUMNODES];     //Node管理slab頁面
};
  • struct kmem_cache_cpu:用於管理每個CPU的slab頁面,可以使用無鎖訪問,提高緩存對象分配速度;
struct kmem_cache_cpu {
    void **freelist;    /* Pointer to next available object */                  //指向空閑對象的指針
    unsigned long tid;  /* Globally unique transaction id */                
    struct page *page;  /* The slab from which we are allocating */     //slab緩存頁面
#ifdef CONFIG_SLUB_CPU_PARTIAL
    struct page *partial;   /* Partially allocated frozen slabs */
#endif
#ifdef CONFIG_SLUB_STATS
    unsigned stat[NR_SLUB_STAT_ITEMS];
#endif
};
  • struct kmem_cache_node:用於管理每個Node的slab頁面,由於每個Node的訪問速度不一致,slab頁面由Node來管理;
/*
 * The slab lists for all objects.
 */
struct kmem_cache_node {
    spinlock_t list_lock;

#ifdef CONFIG_SLUB
    unsigned long nr_partial;    //slab頁表數量
    struct list_head partial;       //slab頁面鏈表
#ifdef CONFIG_SLUB_DEBUG
    atomic_long_t nr_slabs;
    atomic_long_t total_objects;
    struct list_head full;
#endif
#endif
};
  • struct page:用於描述slab頁面struct page結構體中很多字段都是通過union聯合體進行復用的。
    struct page結構中,用於slub的成員如下:
struct page {
    union {
       ...
        void *s_mem;            /* slab first object */
       ...
    };
    
    /* Second double word */
    union {
       ...
        void *freelist;     /* sl[aou]b first free object */
       ...
    };
    
    union {
       ...
        struct {
            union {
              ...
                struct {            /* SLUB */
                    unsigned inuse:16;
                    unsigned objects:15;
                    unsigned frozen:1;
                };
                ...
            };
       ...
        };       
    };   
    
    /*
     * Third double word block
     */
    union {
       ...
        struct {        /* slub per cpu partial pages */
            struct page *next;  /* Next partial slab */
#ifdef CONFIG_64BIT
            int pages;  /* Nr of partial slabs left */
            int pobjects;   /* Approximate # of objects */
#else
            short int pages;
            short int pobjects;
#endif
        };

        struct rcu_head rcu_head;   /* Used by SLAB
                         * when destroying via RCU
                         */
    };
    ...
        struct kmem_cache *slab_cache;  /* SL[AU]B: Pointer to slab */    
    ...
}

圖來了:

3. 流程分析

針對Slub的使用,可以從三個維度來分析:

  1. slub緩存創建
  2. slub對象分配
  3. slub對象釋放

下邊將進一步分析。

3.1 kmem_cache_create

在內核中通過kmem_cache_create接口來創建一個slab緩存

先看一下這個接口的函數調用關係圖:

  1. kmem_cache_create完成的功能比較簡單,就是創建一個用於管理slab緩存kmem_cache結構,並對該結構體進行初始化,最終添加到全局鏈表中。kmem_cache結構體初始化,包括了上文中分析到的kmem_cache_cpukmem_cache_node兩個字段結構。

  2. 在創建的過程中,當發現已有的slab緩存中,有存在對象大小相近,且具有兼容標誌的slab緩存,那就只需要進行merge操作並返回,而無需進一步創建新的slab緩存

  3. calculate_sizes函數會根據指定的force_order或根據對象大小去計算kmem_cache結構體中的size/min/oo等值,其中kmem_cache_order_objects結構體,是由頁面分配order值和對象數量兩者通過位域拼接起來的。

  4. 在創建slab緩存的時候,有一個先雞后蛋的問題:kmem_cache結構體來管理一個slab緩存,而創建kmem_cache結構體又是從slab緩存中分配出來的對象,那麼這個問題是怎麼解決的呢?可以看一下kmem_cache_init函數,內核中定義了兩個靜態的全局變量kmem_cachekmem_cache_node,在kmem_cache_init函數中完成了這兩個結構體的初始化之後,相當於就是創建了兩個slab緩存,一個用於分配kmem_cache結構體對象的緩存池,一個用於分配kmem_cache_node結構體對象的緩存池。由於kmem_cache_cpu結構體是通過__alloc_percpu來分配的,因此不需要創建一個相關的slab緩存

3.2 kmem_cache_alloc

kmem_cache_alloc接口用於從slab緩存池中分配對象。

看一下大體的調用流程圖:

從上圖中可以看出,分配slab對象與Buddy System中分配頁面類似,存在快速路徑和慢速路徑兩種,所謂的快速路徑就是per-CPU緩存,可以無鎖訪問,因而效率更高。

整體的分配流程大體是這樣的:優先從per-CPU緩存中進行分配,如果per-CPU緩存中已經全部分配完畢,則從Node管理的slab頁面中遷移slab頁per-CPU緩存中,再重新分配。當Node管理的slab頁面也不足的情況下,則從Buddy System中分配新的頁面,添加到per-CPU緩存中。

還是用圖來說明更清晰,分為以下幾步來分配:

  1. fastpath
    快速路徑下,以原子的方式檢索per-CPU緩存的freelist列表中的第一個對象,如果freelist為空並且沒有要檢索的對象,則跳入慢速路徑操作,最後再返回到快速路徑中重試操作。

  2. slowpath-1
    將per-CPU緩存中page指向的slab頁中的空閑對象遷移到freelist中,如果有空閑對象,則freeze該頁面,沒有空閑對象則跳轉到slowpath-2

  3. slowpath-2
    將per-CPU緩存中partial鏈表中的第一個slab頁遷移到page指針中,如果partial鏈表為空,則跳轉到slowpath-3

  4. slowpath-3
    將Node管理的partial鏈表中的slab頁遷移到per-CPU緩存中的page中,並重複第二個slab頁將其添加到per-CPU緩存中的partial鏈表中。如果遷移的slab中空閑對象超過了kmem_cache.cpu_partial的一半,則僅遷移slab頁,並且不再重複。
    如果每個Node的partial鏈表都為空,跳轉到slowpath-4

  5. slowpath-4
    Buddy System中獲取頁面,並將其添加到per-CPU的page中。

3.2 kmem_cache_free

kmem_cache_free的操作,可以看成是kmem_cache_alloc的逆過程,因此也分為快速路徑和慢速路徑兩種方式,同時,慢速路徑中又分為了好幾種情況,可以參考kmem_cache_alloc的過程。

調用流程圖如下:

效果如下:

  1. 快速路徑釋放
    快速路徑下,直接將對象返回到freelist中即可。

  2. put_cpu_partial
    put_cpu_partial函數主要是將一個剛freeze的slab頁,放入到partial鏈表中。
    put_cpu_partial函數中調用unfreeze_partials函數,這時候會將per-CPU管理的partial鏈表中的slab頁面添加到Node管理的partial鏈表的尾部。如果超出了Node的partial鏈表,溢出的slab頁面中沒有分配對象的slab頁面將會返回到夥伴系統。

  3. add_partial
    添加slab頁到Node的partial鏈表中。

  4. remove_partial
    從Node的partial鏈表移除slab頁。

具體釋放的流程走哪個分支,跟對象的使用情況,partial鏈表的個數nr_partial/min_partial等相關,細節就不再深入分析了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

I/O多路復用模型

背景

在文章中提到了五種I/O模型,其中前四種:阻塞模型、非阻塞模型、信號驅動模型、I/O復用模型都是同步模型;還有一種是異步模型。

想寫一個系列的文章,介紹從I/O多路復用到異步編程和RPC框架,整個演進過程,這一系列可能包括:

  1. Reactor和Proactor模型
  2. 為什麼需要異步編程
  3. enable_shared_from_this用法分析
  4. 網絡通信庫和RPC

為什麼有多路復用?

多路復用技術要解決的是“通信”問題,解決核心在於“同步事件分離器”(de-multiplexer),linux系統帶有的分離器select、poll、epoll網上介紹的比較多,大家可以看看這篇介紹的不錯的文章:。通信的一方想要知道另一方的狀態(以決定自己做什麼),有兩種方法: 一是輪詢,二是消息通知。

輪詢

輪詢的一種典型的實現可能是這樣的:當然這裏的epoll_wait()也可以使用poll()或者select()替換。

whiletrue) {
    active_stream[] = epoll_wait(epollfd)
    for i in active_stream[] {
        read or write till
    }
}

輪詢方式主要存在以下不足:

  • 增加系統開銷。無論是任務輪詢還是定時器輪詢都需要消耗對應的系統資源。
  • 無法及時感知設備狀態變化。在輪詢間隔內的設備狀態變化只有在下次輪詢時才能被發現,這將無法滿足對實時性敏感的應用場合。
  • 浪費CPU資源。無論設備是否發生狀態改變,輪詢總在進行。在實際情況中,大多數設備的狀態改變通常不會那麼頻繁,輪詢空轉將白白浪費CPU時間片。

消息通知

其實現方式通常是: “阻塞-通知”機制。阻塞會導致一個任務(task_struct,進程或者線程)只能處理一個”I/O流”或者類似的操作,要處理多個,就要多個任務(需要多個進程或線程),因此靈活性上又不如輪詢(一個任務足夠),很矛盾。

 

select、poll、epoll對比

矛盾的根源就是”一”和”多”的矛盾: 希望一個任務處理多個對象,同時避免處理阻塞-通知機制的內部細節。解決方案是多路復用(muliplex)。多路復用有3種基本方案,select()/poll()/epoll(),都是來解決這一矛盾的。

  • 通知代理: 用戶把需要關心的對象註冊給select()/poll()/epoll()函數。
  • 一對多: 所有的被關心的對象,只要有一個對象有了通知事件,select()/poll()/epoll()就會結束阻塞狀態。
  • 方便性: 用戶(程序員)不用再關心如何阻塞和被通知,以及哪些情況下會有通知產生。這件事情已經由上述幾個系統調用做了,用戶只需要實現”通知來了我該做什麼”。

 

那麼上面3個系統調用的區別是什麼呢?
第一個select(),結合了輪詢和阻塞兩種方式,沒有問題,每次有一個對象事件發生的時候,select()只是知道有事件發生了,具體是哪個對象發生的,不知道,需要從頭到尾輪詢一遍,複雜度是O(n)。poll函數相對select函數變化不大,只是提升了最大的可輪詢的對象個數。epoll函數把時間複雜度降到O(1)。

 

為什麼select慢而epoll效率高?
select()之所以慢,有幾個原因: select()的參數是一個FD數組,意味着每次select調用,都是一次新的註冊-阻塞-回調,每次select都要把一個數組從用戶空間拷貝到內核空間,內核檢測到某個對象狀態變化並寫入后,再從內核空間拷貝回用戶空間,select再把這個數組讀取一遍,並返回。這個過程非常低效。

epoll的解決方案相當於是一種對select()的算法優化: 它把select()一個函數做的事情分解成了3步,首先epoll_create()創建一個epollfd對象(相當於一個池子),然後所有被監聽的fd通過epoll_ctrl()註冊到這個池子,也就是為每個fd指定了一個內部的回調函數(這樣,就沒有了每次調用時的來回拷貝,用戶空間的數組到內核空間只有這一次拷貝)。epoll_wait阻塞等待。在內核態有一個和epoll_wait對應的函數調用,把就緒的fd,填入到一個就緒列表中,而epoll_wait讀取這個就緒列表,做到了快速返回(O(1))。

詳細的對比可以參考select、poll、epoll之間的區別總結:

 

有了上面的原理介紹,這裏舉例來說明下epoll到底是怎麼使用的,加深理解。舉兩個例子:

一個是比較簡單的父子進程通信的例子,單個小程序,不需要跑多個應用實例,不需要用戶輸入。
一個是比較實戰的socket+epoll,畢竟現實案例中哪有兩個父子進程間通訊這麼簡單的應用場景。

有了多路復用,難道還不夠?

有了I/O復用,有了epoll已經可以使服務器併發幾十萬連接的同時,維持高TPS了,難道這還不夠嗎?答案是,技術層面足夠了,但在軟件工程層面卻是不夠的。例如,總要有個for循環去調用epoll,總來處理epoll的返回,這是每次都要重複的工作。for循環體裏面寫什麼—-通知返回之後,做事情的程序最好能以一種回調的機制,提供一個編程框架,讓程序更有結構一些。另一方面,如果希望每個事件通知之後,做的事情能有機會被代理到某個線程裏面去單獨運行,而線程完成的狀態又能通知回主任務,那麼”異步”的進制就必須被引入。

所以,還有兩個問題要解決,一是”編程框架”,一是”異步”。我們先看幾個目前流行的框架,大部分框架已經包含了某種異步的機制。我們接下來的篇章將介紹“編程框架”和“異步I/O模型”。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

MySQL 複製表結構和表數據

1、前言

  在功能開發完畢,在本地或者測試環境進行測試時,經常會遇到這種情況:有專門的測試數據,測試過程會涉及到修改表中的數據,經常不能一次測試成功,所以,每次執行測試后,原來表中的數據其實已經被修改了,下一次測試,就需要將數據恢復。

  我一般的做法是:先創建一個副本表,比如測試使用的user表,我在測試前創建副本表user_bak,每次測試后,將user表清空,然後將副本表user_bak的數據導入到user表中。

  上面的操作是對一個table做備份,如果涉及到的table太多,可以創建database的副本。

  接下來我將對此處的表結構複製以及表數據複製進行闡述,並非數據庫的複製原理!!!!

  下面是staff表的表結構

create table staff (
	id int not null auto_increment comment '自增id',
	name char(20) not null comment '用戶姓名',
	dep char(20) not null comment '所屬部門',
	gender tinyint not null default 1 comment '性別:1男; 2女',
	addr char(30) not null comment '地址',
	primary key(id),
	index idx_1 (name, dep),
	index idx_2 (name, gender)
) engine=innodb default charset=utf8mb4 comment '員工表';

 

2、具體方式 

  2.1、執行舊錶的創建SQL來創建表

  如果原始表已經存在,那麼可以使用命令查看該表的創建語句:

mysql> show create table staff\G
*************************** 1. row ***************************
       Table: staff
Create Table: CREATE TABLE `staff` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.01 sec)

  可以看到,上面show creat table xx的命令執行結果中,Create Table的值就是創建表的語句,此時可以直接複製創建表的SQL,然後重新執行一次就行了。

  當數據表中有數據的時候,看到的創建staff表的sql就會稍有不同。比如,我在staff中添加了兩條記錄:

mysql> insert into staff values (null, '李明', 'RD', 1, '北京');
Query OK, 1 row affected (0.00 sec)

mysql> insert into staff values (null, '張三', 'PM', 0, '上海');
Query OK, 1 row affected (0.00 sec)

mysql> select * from staff;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

  此時在執行show create table命令:

mysql> show create table staff\G
*************************** 1. row ***************************
       Table: staff
Create Table: CREATE TABLE `staff` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.00 sec)

  注意,上面結果中的倒數第二行

    ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT=’員工表’

  因為staff表的id是自增的,且已經有了2條記錄,所以下一次插入數據的自增id應該為3,這個信息,也會出現在表的創建sql中。

    

  2.2、使用like創建新表(僅包含表結構)

  使用like根據已有的表來創建新表,特點如下:

  1、方便,不需要查看原表的表結構定義信息;

  2、創建的新表中,表結構定義、完整性約束,都與原表保持一致。

  3、創建的新表是一個空表,全新的表,沒有數據。

  用法如下:

mysql> select * from staff;  #舊錶中已有2條數據
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> create table staff_bak_1 like staff;  # 直接使用like,前面指定新表名,後面指定舊錶(參考的表)
Query OK, 0 rows affected (0.02 sec)

mysql> show create table staff_bak_1\G
*************************** 1. row ***************************
       Table: staff_bak_1
Create Table: CREATE TABLE `staff_bak_1` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'  # 注意沒有AUTO_INCREMENT=3
1 row in set (0.00 sec)

mysql> select * from staff_bak_1; # 沒有包含舊錶的數據
Empty set (0.00 sec)

  

  2.3、使用as來創建新表(包含數據)

  使用as來創建新表,有一下特點:

  1、可以有選擇性的決定新表包含哪些字段;

  2、創建的新表中,會包含舊錶的數據;

  3、創建的新表不會包含舊錶的完整性約束(比如主鍵、索引等),僅包含最基礎的表結構定義。

  用法如下:

mysql> create table staff_bak_2 as select * from staff;
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_2;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> show create table staff_bak_2\G
*************************** 1. row ***************************
       Table: staff_bak_2
Create Table: CREATE TABLE `staff_bak_2` (
  `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
  `name` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '用戶姓名',
  `dep` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
1 row in set (0.00 sec)

  

  利用as創建表的時候沒有保留完整性約束,其實這個仔細想一下也能想明白。因為使用as創建表的時候,可以指定新表包含哪些字段呀,如果你創建新表時,忽略了幾個字段,這樣的話即使保留了完整約束,保存數據是也不能滿足完整性約束。

  比如,staff表有一個索引idx1,由name和dep字段組成;但是我創建的新表中,沒有name和dep字段(只選擇了其他字段),那麼新表中保存idx1也沒有必要,對吧。

mysql> --  只選擇id、gender、addr作為新表的字段,那麼name和dep組成的索引就沒必要存在了
mysql> create table staff_bak_3 as (select id, gender, addr from staff);
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> show create table staff_bak_3\G
*************************** 1. row ***************************
       Table: staff_bak_3
Create Table: CREATE TABLE `staff_bak_3` (
  `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
1 row in set (0.00 sec)

mysql> select * from staff_bak_3;
+----+--------+--------+
| id | gender | addr   |
+----+--------+--------+
|  1 |      1 | 北京   |
|  2 |      0 | 上海   |
+----+--------+--------+
2 rows in set (0.00 sec)

  

  2.4、使用like+insert+select創建原表的副本(推薦)

  使用like創建新表,雖然保留了舊錶的各種表結構定義以及完整性約束,但是如何將舊錶的數據導入到新表中呢?

  最極端的方式:寫一個程序,先將舊錶數據讀出來,然後寫入到新表中,這個方式我就不嘗試了。

  有一個比較簡單的命令:

mysql> select * from staff; #原表數據
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> select * from staff_bak_1; # 使用like創建的表,與原表相同的表結構和完整性約束(自增除外)
Empty set (0.00 sec)

mysql> insert into staff_bak_1 select * from staff;  # 將staff表的所有記錄的所有字段值都插入副本表中
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_1;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

  

  其實這條SQL語句,是知道兩個表的表結構和完整性約束相同,所以,可以直接select *。

insert into staff_bak_1 select * from staff;

  

  如果兩個表結構不相同,其實也是可以這個方式的,比如:

mysql> show create table demo\G
*************************** 1. row ***************************
       Table: demo
Create Table: CREATE TABLE `demo` (
  `_id` int(11) NOT NULL AUTO_INCREMENT,
  `_name` char(20) DEFAULT NULL,
  `_gender` tinyint(4) DEFAULT '1',
  PRIMARY KEY (`_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)

# 只將staff表中的id和name字段組成的數據記錄插入到demo表中,對應_id和_name字段
mysql> insert into demo (_id, _name) select id,name from staff;
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from demo;
+-----+--------+---------+
| _id | _name  | _gender |
+-----+--------+---------+
|   1 | 李明   |       1 |
|   2 | 張三   |       1 |
+-----+--------+---------+
2 rows in set (0.00 sec)

  這是兩個表的字段數量不相同的情況,此時需要手動指定列名,否則就會報錯

 

  另外,如果兩個表的字段數量,以及相同順序的字段類型相同,如果是全部字段複製,即使字段名不同,也可以直接複製

# staff_bak_5的字段名與staff表並不相同,但是字段數量、相同順序字段的類型相同,所以可以直接插入
mysql> show create table staff_bak_5\G
*************************** 1. row ***************************
       Table: staff_bak_5
Create Table: CREATE TABLE `staff_bak_5` (
  `_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `_name` char(20) NOT NULL COMMENT '用戶姓名',
  `_dep` char(20) NOT NULL COMMENT '所屬部門',
  `_gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `_addr` char(30) NOT NULL,
  PRIMARY KEY (`_id`),
  KEY `idx_1` (`_name`,`_dep`),
  KEY `idx_2` (`_name`,`_gender`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.00 sec)

mysql> insert into staff_bak_5 select * from staff;
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_5;
+-----+--------+------+---------+--------+
| _id | _name  | _dep | _gender | _addr  |
+-----+--------+------+---------+--------+
|   1 | 李明   | RD   |       1 | 北京   |
|   2 | 張三   | PM   |       0 | 上海   |
+-----+--------+------+---------+--------+
2 rows in set (0.00 sec)

  

  

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整